]> git.basschouten.com Git - openhab-addons.git/blob
3d5facefe1abd159276a3162f54926ebc92ef849
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.nio.charset.StandardCharsets;
16 import java.util.ArrayList;
17 import java.util.IllegalFormatException;
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.commons.lang.StringUtils;
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.mqtt.generic.values.TextValue;
28 import org.openhab.binding.mqtt.generic.values.Value;
29 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
30 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
31 import org.openhab.core.library.types.StringType;
32 import org.openhab.core.thing.ChannelUID;
33 import org.openhab.core.types.Command;
34 import org.openhab.core.types.TypeParser;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * This object consists of an {@link Value}, which is updated on the respective MQTT topic change.
40  * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
41  *
42  * @author David Graeff - Initial contribution
43  */
44 @NonNullByDefault
45 public class ChannelState implements MqttMessageSubscriber {
46     private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
47
48     // Immutable channel configuration
49     protected final boolean readOnly;
50     protected final ChannelUID channelUID;
51     protected final ChannelConfig config;
52
53     /** Channel value **/
54     protected final Value cachedValue;
55
56     // Runtime variables
57     private @Nullable MqttBrokerConnection connection;
58     protected final List<ChannelStateTransformation> transformationsIn = new ArrayList<>();
59     protected final List<ChannelStateTransformation> transformationsOut = new ArrayList<>();
60     private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
61     protected boolean hasSubscribed = false;
62     private @Nullable ScheduledFuture<?> scheduledFuture;
63     private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
64     private final Object futureLock = new Object();
65
66     /**
67      * Creates a new channel state.
68      *
69      * @param config The channel configuration
70      * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
71      * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
72      *            needs a cache for the current value.
73      * @param channelStateUpdateListener A channel state update listener
74      */
75     public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
76             @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
77         this.config = config;
78         this.channelStateUpdateListener = channelStateUpdateListener;
79         this.channelUID = channelUID;
80         this.cachedValue = cachedValue;
81         this.readOnly = StringUtils.isBlank(config.commandTopic);
82     }
83
84     public boolean isReadOnly() {
85         return this.readOnly;
86     }
87
88     /**
89      * Add a transformation that is applied for each received MQTT topic value.
90      * The transformations are executed in order.
91      *
92      * @param transformation A transformation
93      */
94     public void addTransformation(ChannelStateTransformation transformation) {
95         transformationsIn.add(transformation);
96     }
97
98     /**
99      * Add a transformation that is applied for each value to be published.
100      * The transformations are executed in order.
101      *
102      * @param transformation A transformation
103      */
104     public void addTransformationOut(ChannelStateTransformation transformation) {
105         transformationsOut.add(transformation);
106     }
107
108     /**
109      * Clear transformations
110      */
111     public void clearTransformations() {
112         transformationsIn.clear();
113         transformationsOut.clear();
114     }
115
116     /**
117      * Returns the cached value state object of this message subscriber.
118      * <p>
119      * MQTT only notifies us once about a value, during the subscribe.
120      * The channel state therefore needs a cache for the current value.
121      * If MQTT has not yet published a value, the cache might still be in UNDEF state.
122      * </p>
123      */
124     public Value getCache() {
125         return cachedValue;
126     }
127
128     /**
129      * Return the channelUID
130      */
131     public ChannelUID channelUID() {
132         return channelUID;
133     }
134
135     /**
136      * Incoming message from the MqttBrokerConnection
137      *
138      * @param topic The topic. Is the same as the field stateTopic.
139      * @param payload The byte payload. Must be UTF8 encoded text or binary data.
140      */
141     @Override
142     public void processMessage(String topic, byte[] payload) {
143         final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
144         if (channelStateUpdateListener == null) {
145             logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
146             return;
147         }
148
149         if (cachedValue.isBinary()) {
150             cachedValue.update(payload);
151             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
152             receivedOrTimeout();
153             return;
154         }
155
156         // String value: Apply transformations
157         String strValue = new String(payload, StandardCharsets.UTF_8);
158         for (ChannelStateTransformation t : transformationsIn) {
159             String transformedValue = t.processValue(strValue);
160             if (transformedValue != null) {
161                 strValue = transformedValue;
162             } else {
163                 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue, t.serviceName);
164                 receivedOrTimeout();
165                 return;
166             }
167         }
168
169         // Is trigger?: Special handling
170         if (config.trigger) {
171             channelStateUpdateListener.triggerChannel(channelUID, strValue);
172             receivedOrTimeout();
173             return;
174         }
175
176         Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
177         if (command == null) {
178             logger.warn("Incoming payload '{}' not supported by type '{}'", strValue,
179                     cachedValue.getClass().getSimpleName());
180             receivedOrTimeout();
181             return;
182         }
183
184         Command postOnlyCommand = cachedValue.isPostOnly(command);
185         if (postOnlyCommand != null) {
186             channelStateUpdateListener.postChannelCommand(channelUID, postOnlyCommand);
187             receivedOrTimeout();
188             return;
189         }
190
191         // Map the string to a command, update the cached value and post the command to the framework
192         try {
193             cachedValue.update(command);
194         } catch (IllegalArgumentException | IllegalStateException e) {
195             logger.warn("Command '{}' not supported by type '{}': {}", strValue, cachedValue.getClass().getSimpleName(),
196                     e.getMessage());
197             receivedOrTimeout();
198             return;
199         }
200
201         if (config.postCommand) {
202             channelStateUpdateListener.postChannelCommand(channelUID, (Command) cachedValue.getChannelState());
203         } else {
204             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
205         }
206         receivedOrTimeout();
207     }
208
209     /**
210      * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
211      */
212     public String getStateTopic() {
213         return config.stateTopic;
214     }
215
216     /**
217      * Return the command topic. Might be an empty string, if this is a read-only channel.
218      */
219     public String getCommandTopic() {
220         return config.commandTopic;
221     }
222
223     /**
224      * Returns the channelType ID which also happens to be an item-type
225      */
226     public String getItemType() {
227         return cachedValue.getItemType();
228     }
229
230     /**
231      * Returns true if this is a stateful channel.
232      */
233     public boolean isStateful() {
234         return config.retained;
235     }
236
237     /**
238      * Removes the subscription to the state topic and resets the channelStateUpdateListener.
239      *
240      * @return A future that completes with true if unsubscribing from the state topic succeeded.
241      *         It completes with false if no connection is established and completes exceptionally otherwise.
242      */
243     public CompletableFuture<@Nullable Void> stop() {
244         final MqttBrokerConnection connection = this.connection;
245         if (connection != null && StringUtils.isNotBlank(config.stateTopic)) {
246             return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
247         } else {
248             internalStop();
249             return CompletableFuture.completedFuture(null);
250         }
251     }
252
253     private void internalStop() {
254         logger.debug("Unsubscribed channel {} form topic: {}", this.channelUID, config.stateTopic);
255         this.connection = null;
256         this.channelStateUpdateListener = null;
257         hasSubscribed = false;
258         cachedValue.resetState();
259     }
260
261     private void receivedOrTimeout() {
262         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
263         if (scheduledFuture != null) { // Cancel timeout
264             scheduledFuture.cancel(false);
265             this.scheduledFuture = null;
266         }
267         future.complete(null);
268     }
269
270     private @Nullable Void subscribeFail(Throwable e) {
271         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
272         if (scheduledFuture != null) { // Cancel timeout
273             scheduledFuture.cancel(false);
274             this.scheduledFuture = null;
275         }
276         future.completeExceptionally(e);
277         return null;
278     }
279
280     /**
281      * Subscribes to the state topic on the given connection and informs about updates on the given listener.
282      *
283      * @param connection A broker connection
284      * @param scheduler A scheduler to realize the timeout
285      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
286      * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
287      *         and exceptionally otherwise.
288      */
289     public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
290             int timeout) {
291         synchronized (futureLock) {
292             // if the connection is still the same, the subscription is still present, otherwise we need to renew
293             if ((hasSubscribed || !future.isDone()) && connection.equals(this.connection)) {
294                 return future;
295             }
296             hasSubscribed = false;
297
298             this.connection = connection;
299
300             if (StringUtils.isBlank(config.stateTopic)) {
301                 return CompletableFuture.completedFuture(null);
302             }
303
304             this.future = new CompletableFuture<>();
305         }
306         connection.subscribe(config.stateTopic, this).thenRun(() -> {
307             hasSubscribed = true;
308             logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
309             if (timeout > 0 && !future.isDone()) {
310                 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
311             } else {
312                 receivedOrTimeout();
313             }
314         }).exceptionally(this::subscribeFail);
315         return future;
316     }
317
318     /**
319      * Return true if this channel has subscribed to its MQTT topics.
320      * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
321      * have a stateTopic set, to subscribe this channel.
322      */
323     public boolean hasSubscribed() {
324         return this.hasSubscribed;
325     }
326
327     /**
328      * Publishes a value on MQTT. A command topic needs to be set in the configuration.
329      *
330      * @param command The command to send
331      * @return A future that completes with true if the publishing worked and false if it is a readonly topic
332      *         and exceptionally otherwise.
333      */
334     public CompletableFuture<Boolean> publishValue(Command command) {
335         cachedValue.update(command);
336
337         Value mqttCommandValue = cachedValue;
338
339         final MqttBrokerConnection connection = this.connection;
340
341         if (connection == null) {
342             CompletableFuture<Boolean> f = new CompletableFuture<>();
343             f.completeExceptionally(new IllegalStateException(
344                     "The connection object has not been set. start() should have been called!"));
345             return f;
346         }
347
348         if (readOnly) {
349             logger.debug(
350                     "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
351                     mqttCommandValue, config.commandTopic);
352             return CompletableFuture.completedFuture(false);
353         }
354
355         // Outgoing transformations
356         for (ChannelStateTransformation t : transformationsOut) {
357             String commandString = mqttCommandValue.getMQTTpublishValue(null);
358             String transformedValue = t.processValue(commandString);
359             if (transformedValue != null) {
360                 Value textValue = new TextValue();
361                 textValue.update(new StringType(transformedValue));
362                 mqttCommandValue = textValue;
363             } else {
364                 logger.debug("Transformation '{}' returned null on '{}', discarding message", mqttCommandValue,
365                         t.serviceName);
366                 return CompletableFuture.completedFuture(false);
367             }
368         }
369
370         String commandString;
371
372         // Formatter: Applied before the channel state value is published to the MQTT broker.
373         if (config.formatBeforePublish.length() > 0) {
374             try {
375                 commandString = mqttCommandValue.getMQTTpublishValue(config.formatBeforePublish);
376             } catch (IllegalFormatException e) {
377                 logger.debug("Format pattern incorrect for {}", channelUID, e);
378                 commandString = mqttCommandValue.getMQTTpublishValue(null);
379             }
380         } else {
381             commandString = mqttCommandValue.getMQTTpublishValue(null);
382         }
383
384         int qos = (config.qos != null) ? config.qos : connection.getQos();
385
386         return connection.publish(config.commandTopic, commandString.getBytes(), qos, config.retained);
387     }
388
389     /**
390      * @return The channelStateUpdateListener
391      */
392     public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
393         return channelStateUpdateListener;
394     }
395
396     /**
397      * @param channelStateUpdateListener The channelStateUpdateListener to set
398      */
399     public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
400         this.channelStateUpdateListener = channelStateUpdateListener;
401     }
402
403     public @Nullable MqttBrokerConnection getConnection() {
404         return connection;
405     }
406
407     /**
408      * This is for tests only to inject a broker connection. Use
409      * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
410      *
411      * @param connection MQTT Broker connection
412      */
413     public void setConnection(MqttBrokerConnection connection) {
414         this.connection = connection;
415     }
416 }