]> git.basschouten.com Git - openhab-addons.git/blob
607253f17bd8245431661a5126a8333a96068876
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.nio.charset.StandardCharsets;
16 import java.util.ArrayList;
17 import java.util.IllegalFormatException;
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23 import java.util.stream.Stream;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.mqtt.generic.values.TextValue;
28 import org.openhab.binding.mqtt.generic.values.Value;
29 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
30 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
31 import org.openhab.core.library.types.StringType;
32 import org.openhab.core.thing.ChannelUID;
33 import org.openhab.core.types.Command;
34 import org.openhab.core.types.State;
35 import org.openhab.core.types.TypeParser;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * This object consists of a {@link Value}, which is updated on the respective MQTT topic change.
41  * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
42  *
43  * @author David Graeff - Initial contribution
44  */
45 @NonNullByDefault
46 public class ChannelState implements MqttMessageSubscriber {
47     private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
48
49     // Immutable channel configuration
50     protected final boolean readOnly;
51     protected final ChannelUID channelUID;
52     protected final ChannelConfig config;
53
54     /** Channel value **/
55     protected final Value cachedValue;
56
57     // Runtime variables
58     private @Nullable MqttBrokerConnection connection;
59     protected final List<ChannelStateTransformation> transformationsIn = new ArrayList<>();
60     protected final List<ChannelStateTransformation> transformationsOut = new ArrayList<>();
61     private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
62     protected boolean hasSubscribed = false;
63     private @Nullable ScheduledFuture<?> scheduledFuture;
64     private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
65     private final Object futureLock = new Object();
66
67     /**
68      * Creates a new channel state.
69      *
70      * @param config The channel configuration
71      * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
72      * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
73      *            needs a cache for the current value.
74      * @param channelStateUpdateListener A channel state update listener
75      */
76     public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
77             @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
78         this.config = config;
79         this.channelStateUpdateListener = channelStateUpdateListener;
80         this.channelUID = channelUID;
81         this.cachedValue = cachedValue;
82         this.readOnly = config.commandTopic.isBlank();
83     }
84
85     public boolean isReadOnly() {
86         return this.readOnly;
87     }
88
89     /**
90      * Add a transformation that is applied for each received MQTT topic value.
91      * The transformations are executed in order.
92      *
93      * @param transformation A transformation
94      */
95     public void addTransformation(ChannelStateTransformation transformation) {
96         transformationsIn.add(transformation);
97     }
98
99     public void addTransformation(String transformation, TransformationServiceProvider transformationServiceProvider) {
100         parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformation(t));
101     }
102
103     /**
104      * Add a transformation that is applied for each value to be published.
105      * The transformations are executed in order.
106      *
107      * @param transformation A transformation
108      */
109     public void addTransformationOut(ChannelStateTransformation transformation) {
110         transformationsOut.add(transformation);
111     }
112
113     public void addTransformationOut(String transformation,
114             TransformationServiceProvider transformationServiceProvider) {
115         parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformationOut(t));
116     }
117
118     public static Stream<ChannelStateTransformation> parseTransformation(String transformation,
119             TransformationServiceProvider transformationServiceProvider) {
120         String[] transformations = transformation.split("∩");
121         return Stream.of(transformations).filter(t -> !t.isBlank())
122                 .map(t -> new ChannelStateTransformation(t, transformationServiceProvider));
123     }
124
125     /**
126      * Clear transformations
127      */
128     public void clearTransformations() {
129         transformationsIn.clear();
130         transformationsOut.clear();
131     }
132
133     /**
134      * Returns the cached value state object of this message subscriber.
135      * <p>
136      * MQTT only notifies us once about a value, during the subscribe.
137      * The channel state therefore needs a cache for the current value.
138      * If MQTT has not yet published a value, the cache might still be in UNDEF state.
139      * </p>
140      */
141     public Value getCache() {
142         return cachedValue;
143     }
144
145     /**
146      * Return the channelUID
147      */
148     public ChannelUID channelUID() {
149         return channelUID;
150     }
151
152     /**
153      * Incoming message from the MqttBrokerConnection
154      *
155      * @param topic The topic. Is the same as the field stateTopic.
156      * @param payload The byte payload. Must be UTF8 encoded text or binary data.
157      */
158     @Override
159     public void processMessage(String topic, byte[] payload) {
160         final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
161         if (channelStateUpdateListener == null) {
162             logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
163             return;
164         }
165
166         if (cachedValue.isBinary()) {
167             cachedValue.update(payload);
168             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
169             receivedOrTimeout();
170             return;
171         }
172
173         // String value: Apply transformations
174         String strValue = new String(payload, StandardCharsets.UTF_8);
175         for (ChannelStateTransformation t : transformationsIn) {
176             String transformedValue = t.processValue(strValue);
177             if (transformedValue != null) {
178                 strValue = transformedValue;
179             } else {
180                 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue, t.serviceName);
181                 receivedOrTimeout();
182                 return;
183             }
184         }
185
186         // Is trigger?: Special handling
187         if (config.trigger) {
188             channelStateUpdateListener.triggerChannel(channelUID, strValue);
189             receivedOrTimeout();
190             return;
191         }
192
193         Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
194         if (command == null) {
195             logger.warn("Incoming payload '{}' on '{}' not supported by type '{}'", strValue, topic,
196                     cachedValue.getClass().getSimpleName());
197             receivedOrTimeout();
198             return;
199         }
200
201         Command parsedCommand;
202         // Map the string to a command, update the cached value and post the command to the framework
203         try {
204             parsedCommand = cachedValue.parseMessage(command);
205         } catch (IllegalArgumentException | IllegalStateException e) {
206             logger.warn("Command '{}' from channel '{}' not supported by type '{}': {}", strValue, channelUID,
207                     cachedValue.getClass().getSimpleName(), e.getMessage());
208             receivedOrTimeout();
209             return;
210         }
211
212         // things that are only Commands _must_ be posted as a command (like STOP)
213         if (!(parsedCommand instanceof State)) {
214             channelStateUpdateListener.postChannelCommand(channelUID, parsedCommand);
215             receivedOrTimeout();
216             return;
217         }
218         cachedValue.update((State) parsedCommand);
219
220         if (config.postCommand) {
221             channelStateUpdateListener.postChannelCommand(channelUID, (Command) cachedValue.getChannelState());
222         } else {
223             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
224         }
225         receivedOrTimeout();
226     }
227
228     /**
229      * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
230      */
231     public String getStateTopic() {
232         return config.stateTopic;
233     }
234
235     /**
236      * Return the command topic. Might be an empty string, if this is a read-only channel.
237      */
238     public String getCommandTopic() {
239         return config.commandTopic;
240     }
241
242     /**
243      * Returns the channelType ID which also happens to be an item-type
244      */
245     public String getItemType() {
246         return cachedValue.getItemType();
247     }
248
249     /**
250      * Returns true if this is a stateful channel.
251      */
252     public boolean isStateful() {
253         return config.retained;
254     }
255
256     /**
257      * Removes the subscription to the state topic and resets the channelStateUpdateListener.
258      *
259      * @return A future that completes with true if unsubscribing from the state topic succeeded.
260      *         It completes with false if no connection is established and completes exceptionally otherwise.
261      */
262     public CompletableFuture<@Nullable Void> stop() {
263         final MqttBrokerConnection connection = this.connection;
264         if (connection != null && !config.stateTopic.isBlank()) {
265             return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
266         } else {
267             internalStop();
268             return CompletableFuture.completedFuture(null);
269         }
270     }
271
272     private void internalStop() {
273         logger.debug("Unsubscribed channel {} from topic: {}", this.channelUID, config.stateTopic);
274         this.connection = null;
275         this.channelStateUpdateListener = null;
276         hasSubscribed = false;
277         cachedValue.resetState();
278     }
279
280     private void receivedOrTimeout() {
281         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
282         if (scheduledFuture != null) { // Cancel timeout
283             scheduledFuture.cancel(false);
284             this.scheduledFuture = null;
285         }
286         future.complete(null);
287     }
288
289     private @Nullable Void subscribeFail(Throwable e) {
290         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
291         if (scheduledFuture != null) { // Cancel timeout
292             scheduledFuture.cancel(false);
293             this.scheduledFuture = null;
294         }
295         future.completeExceptionally(e);
296         return null;
297     }
298
299     /**
300      * Subscribes to the state topic on the given connection and informs about updates on the given listener.
301      *
302      * @param connection A broker connection
303      * @param scheduler A scheduler to realize the timeout
304      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
305      * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
306      *         and exceptionally otherwise.
307      */
308     public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
309             int timeout) {
310         synchronized (futureLock) {
311             // if the connection is still the same, the subscription is still present, otherwise we need to renew
312             if ((hasSubscribed || !future.isDone()) && connection.equals(this.connection)) {
313                 return future;
314             }
315             hasSubscribed = false;
316
317             this.connection = connection;
318
319             if (config.stateTopic.isBlank()) {
320                 return CompletableFuture.completedFuture(null);
321             }
322
323             this.future = new CompletableFuture<>();
324         }
325         connection.subscribe(config.stateTopic, this).thenRun(() -> {
326             hasSubscribed = true;
327             logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
328             if (timeout > 0 && !future.isDone()) {
329                 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
330             } else {
331                 receivedOrTimeout();
332             }
333         }).exceptionally(this::subscribeFail);
334         return future;
335     }
336
337     /**
338      * Return true if this channel has subscribed to its MQTT topics.
339      * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
340      * have a stateTopic set, to subscribe this channel.
341      */
342     public boolean hasSubscribed() {
343         return this.hasSubscribed;
344     }
345
346     /**
347      * Publishes a value on MQTT. A command topic needs to be set in the configuration.
348      *
349      * @param command The command to send
350      * @return A future that completes with true if the publishing worked and false if it is a readonly topic
351      *         and exceptionally otherwise.
352      */
353     public CompletableFuture<Boolean> publishValue(Command command) {
354         final MqttBrokerConnection connection = this.connection;
355
356         if (connection == null) {
357             CompletableFuture<Boolean> f = new CompletableFuture<>();
358             f.completeExceptionally(new IllegalStateException(
359                     "The connection object has not been set. start() should have been called!"));
360             return f;
361         }
362
363         Command mqttCommandValue = cachedValue.parseCommand(command);
364         Value mqttFormatter = cachedValue;
365
366         if (readOnly) {
367             logger.debug(
368                     "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
369                     mqttCommandValue, config.commandTopic);
370             return CompletableFuture.completedFuture(false);
371         }
372
373         // Outgoing transformations
374         for (ChannelStateTransformation t : transformationsOut) {
375             String commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
376             String transformedValue = t.processValue(commandString);
377             if (transformedValue != null) {
378                 mqttFormatter = new TextValue();
379                 mqttCommandValue = new StringType(transformedValue);
380             } else {
381                 logger.debug("Transformation '{}' returned null on '{}', discarding message", mqttCommandValue,
382                         t.serviceName);
383                 return CompletableFuture.completedFuture(false);
384             }
385         }
386
387         String commandString;
388
389         // Formatter: Applied before the channel state value is published to the MQTT broker.
390         if (config.formatBeforePublish.length() > 0) {
391             try {
392                 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, config.formatBeforePublish);
393             } catch (IllegalFormatException e) {
394                 logger.debug("Format pattern incorrect for {}", channelUID, e);
395                 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
396             }
397         } else {
398             commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
399         }
400
401         int qos = (config.qos != null) ? config.qos : connection.getQos();
402
403         return connection.publish(config.commandTopic, commandString.getBytes(), qos, config.retained);
404     }
405
406     /**
407      * @return The channelStateUpdateListener
408      */
409     public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
410         return channelStateUpdateListener;
411     }
412
413     /**
414      * @param channelStateUpdateListener The channelStateUpdateListener to set
415      */
416     public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
417         this.channelStateUpdateListener = channelStateUpdateListener;
418     }
419
420     public @Nullable MqttBrokerConnection getConnection() {
421         return connection;
422     }
423
424     /**
425      * This is for tests only to inject a broker connection. Use
426      * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
427      *
428      * @param connection MQTT Broker connection
429      */
430     public void setConnection(MqttBrokerConnection connection) {
431         this.connection = connection;
432     }
433 }