]> git.basschouten.com Git - openhab-addons.git/blob
e111f50e78b0b88a88c6734e62255ea901e50178
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.nio.charset.StandardCharsets;
16 import java.util.ArrayList;
17 import java.util.IllegalFormatException;
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23 import java.util.stream.Stream;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.mqtt.generic.values.TextValue;
28 import org.openhab.binding.mqtt.generic.values.Value;
29 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
30 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
31 import org.openhab.core.library.types.StringType;
32 import org.openhab.core.thing.ChannelUID;
33 import org.openhab.core.types.Command;
34 import org.openhab.core.types.TypeParser;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * This object consists of a {@link Value}, which is updated on the respective MQTT topic change.
40  * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
41  *
42  * @author David Graeff - Initial contribution
43  */
44 @NonNullByDefault
45 public class ChannelState implements MqttMessageSubscriber {
46     private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
47
48     // Immutable channel configuration
49     protected final boolean readOnly;
50     protected final ChannelUID channelUID;
51     protected final ChannelConfig config;
52
53     /** Channel value **/
54     protected final Value cachedValue;
55
56     // Runtime variables
57     private @Nullable MqttBrokerConnection connection;
58     protected final List<ChannelStateTransformation> transformationsIn = new ArrayList<>();
59     protected final List<ChannelStateTransformation> transformationsOut = new ArrayList<>();
60     private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
61     protected boolean hasSubscribed = false;
62     private @Nullable ScheduledFuture<?> scheduledFuture;
63     private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
64     private final Object futureLock = new Object();
65
66     /**
67      * Creates a new channel state.
68      *
69      * @param config The channel configuration
70      * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
71      * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
72      *            needs a cache for the current value.
73      * @param channelStateUpdateListener A channel state update listener
74      */
75     public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
76             @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
77         this.config = config;
78         this.channelStateUpdateListener = channelStateUpdateListener;
79         this.channelUID = channelUID;
80         this.cachedValue = cachedValue;
81         this.readOnly = config.commandTopic.isBlank();
82     }
83
84     public boolean isReadOnly() {
85         return this.readOnly;
86     }
87
88     /**
89      * Add a transformation that is applied for each received MQTT topic value.
90      * The transformations are executed in order.
91      *
92      * @param transformation A transformation
93      */
94     public void addTransformation(ChannelStateTransformation transformation) {
95         transformationsIn.add(transformation);
96     }
97
98     public void addTransformation(String transformation, TransformationServiceProvider transformationServiceProvider) {
99         parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformation(t));
100     }
101
102     /**
103      * Add a transformation that is applied for each value to be published.
104      * The transformations are executed in order.
105      *
106      * @param transformation A transformation
107      */
108     public void addTransformationOut(ChannelStateTransformation transformation) {
109         transformationsOut.add(transformation);
110     }
111
112     public void addTransformationOut(String transformation,
113             TransformationServiceProvider transformationServiceProvider) {
114         parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformationOut(t));
115     }
116
117     public static Stream<ChannelStateTransformation> parseTransformation(String transformation,
118             TransformationServiceProvider transformationServiceProvider) {
119         String[] transformations = transformation.split("∩");
120         return Stream.of(transformations).filter(t -> !t.isBlank())
121                 .map(t -> new ChannelStateTransformation(t, transformationServiceProvider));
122     }
123
124     /**
125      * Clear transformations
126      */
127     public void clearTransformations() {
128         transformationsIn.clear();
129         transformationsOut.clear();
130     }
131
132     /**
133      * Returns the cached value state object of this message subscriber.
134      * <p>
135      * MQTT only notifies us once about a value, during the subscribe.
136      * The channel state therefore needs a cache for the current value.
137      * If MQTT has not yet published a value, the cache might still be in UNDEF state.
138      * </p>
139      */
140     public Value getCache() {
141         return cachedValue;
142     }
143
144     /**
145      * Return the channelUID
146      */
147     public ChannelUID channelUID() {
148         return channelUID;
149     }
150
151     /**
152      * Incoming message from the MqttBrokerConnection
153      *
154      * @param topic The topic. Is the same as the field stateTopic.
155      * @param payload The byte payload. Must be UTF8 encoded text or binary data.
156      */
157     @Override
158     public void processMessage(String topic, byte[] payload) {
159         final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
160         if (channelStateUpdateListener == null) {
161             logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
162             return;
163         }
164
165         if (cachedValue.isBinary()) {
166             cachedValue.update(payload);
167             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
168             receivedOrTimeout();
169             return;
170         }
171
172         // String value: Apply transformations
173         String strValue = new String(payload, StandardCharsets.UTF_8);
174         for (ChannelStateTransformation t : transformationsIn) {
175             String transformedValue = t.processValue(strValue);
176             if (transformedValue != null) {
177                 strValue = transformedValue;
178             } else {
179                 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue, t.serviceName);
180                 receivedOrTimeout();
181                 return;
182             }
183         }
184
185         // Is trigger?: Special handling
186         if (config.trigger) {
187             channelStateUpdateListener.triggerChannel(channelUID, strValue);
188             receivedOrTimeout();
189             return;
190         }
191
192         Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
193         if (command == null) {
194             logger.warn("Incoming payload '{}' not supported by type '{}'", strValue,
195                     cachedValue.getClass().getSimpleName());
196             receivedOrTimeout();
197             return;
198         }
199
200         Command postOnlyCommand = cachedValue.isPostOnly(command);
201         if (postOnlyCommand != null) {
202             channelStateUpdateListener.postChannelCommand(channelUID, postOnlyCommand);
203             receivedOrTimeout();
204             return;
205         }
206
207         // Map the string to a command, update the cached value and post the command to the framework
208         try {
209             cachedValue.update(command);
210         } catch (IllegalArgumentException | IllegalStateException e) {
211             logger.warn("Command '{}' from channel '{}' not supported by type '{}': {}", strValue, channelUID,
212                     cachedValue.getClass().getSimpleName(), e.getMessage());
213             receivedOrTimeout();
214             return;
215         }
216
217         if (config.postCommand) {
218             channelStateUpdateListener.postChannelCommand(channelUID, (Command) cachedValue.getChannelState());
219         } else {
220             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
221         }
222         receivedOrTimeout();
223     }
224
225     /**
226      * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
227      */
228     public String getStateTopic() {
229         return config.stateTopic;
230     }
231
232     /**
233      * Return the command topic. Might be an empty string, if this is a read-only channel.
234      */
235     public String getCommandTopic() {
236         return config.commandTopic;
237     }
238
239     /**
240      * Returns the channelType ID which also happens to be an item-type
241      */
242     public String getItemType() {
243         return cachedValue.getItemType();
244     }
245
246     /**
247      * Returns true if this is a stateful channel.
248      */
249     public boolean isStateful() {
250         return config.retained;
251     }
252
253     /**
254      * Removes the subscription to the state topic and resets the channelStateUpdateListener.
255      *
256      * @return A future that completes with true if unsubscribing from the state topic succeeded.
257      *         It completes with false if no connection is established and completes exceptionally otherwise.
258      */
259     public CompletableFuture<@Nullable Void> stop() {
260         final MqttBrokerConnection connection = this.connection;
261         if (connection != null && !config.stateTopic.isBlank()) {
262             return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
263         } else {
264             internalStop();
265             return CompletableFuture.completedFuture(null);
266         }
267     }
268
269     private void internalStop() {
270         logger.debug("Unsubscribed channel {} from topic: {}", this.channelUID, config.stateTopic);
271         this.connection = null;
272         this.channelStateUpdateListener = null;
273         hasSubscribed = false;
274         cachedValue.resetState();
275     }
276
277     private void receivedOrTimeout() {
278         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
279         if (scheduledFuture != null) { // Cancel timeout
280             scheduledFuture.cancel(false);
281             this.scheduledFuture = null;
282         }
283         future.complete(null);
284     }
285
286     private @Nullable Void subscribeFail(Throwable e) {
287         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
288         if (scheduledFuture != null) { // Cancel timeout
289             scheduledFuture.cancel(false);
290             this.scheduledFuture = null;
291         }
292         future.completeExceptionally(e);
293         return null;
294     }
295
296     /**
297      * Subscribes to the state topic on the given connection and informs about updates on the given listener.
298      *
299      * @param connection A broker connection
300      * @param scheduler A scheduler to realize the timeout
301      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
302      * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
303      *         and exceptionally otherwise.
304      */
305     public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
306             int timeout) {
307         synchronized (futureLock) {
308             // if the connection is still the same, the subscription is still present, otherwise we need to renew
309             if ((hasSubscribed || !future.isDone()) && connection.equals(this.connection)) {
310                 return future;
311             }
312             hasSubscribed = false;
313
314             this.connection = connection;
315
316             if (config.stateTopic.isBlank()) {
317                 return CompletableFuture.completedFuture(null);
318             }
319
320             this.future = new CompletableFuture<>();
321         }
322         connection.subscribe(config.stateTopic, this).thenRun(() -> {
323             hasSubscribed = true;
324             logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
325             if (timeout > 0 && !future.isDone()) {
326                 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
327             } else {
328                 receivedOrTimeout();
329             }
330         }).exceptionally(this::subscribeFail);
331         return future;
332     }
333
334     /**
335      * Return true if this channel has subscribed to its MQTT topics.
336      * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
337      * have a stateTopic set, to subscribe this channel.
338      */
339     public boolean hasSubscribed() {
340         return this.hasSubscribed;
341     }
342
343     /**
344      * Publishes a value on MQTT. A command topic needs to be set in the configuration.
345      *
346      * @param command The command to send
347      * @return A future that completes with true if the publishing worked and false if it is a readonly topic
348      *         and exceptionally otherwise.
349      */
350     public CompletableFuture<Boolean> publishValue(Command command) {
351         cachedValue.update(command);
352
353         Value mqttCommandValue = cachedValue;
354
355         final MqttBrokerConnection connection = this.connection;
356
357         if (connection == null) {
358             CompletableFuture<Boolean> f = new CompletableFuture<>();
359             f.completeExceptionally(new IllegalStateException(
360                     "The connection object has not been set. start() should have been called!"));
361             return f;
362         }
363
364         if (readOnly) {
365             logger.debug(
366                     "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
367                     mqttCommandValue, config.commandTopic);
368             return CompletableFuture.completedFuture(false);
369         }
370
371         // Outgoing transformations
372         for (ChannelStateTransformation t : transformationsOut) {
373             String commandString = mqttCommandValue.getMQTTpublishValue(null);
374             String transformedValue = t.processValue(commandString);
375             if (transformedValue != null) {
376                 Value textValue = new TextValue();
377                 textValue.update(new StringType(transformedValue));
378                 mqttCommandValue = textValue;
379             } else {
380                 logger.debug("Transformation '{}' returned null on '{}', discarding message", mqttCommandValue,
381                         t.serviceName);
382                 return CompletableFuture.completedFuture(false);
383             }
384         }
385
386         String commandString;
387
388         // Formatter: Applied before the channel state value is published to the MQTT broker.
389         if (config.formatBeforePublish.length() > 0) {
390             try {
391                 commandString = mqttCommandValue.getMQTTpublishValue(config.formatBeforePublish);
392             } catch (IllegalFormatException e) {
393                 logger.debug("Format pattern incorrect for {}", channelUID, e);
394                 commandString = mqttCommandValue.getMQTTpublishValue(null);
395             }
396         } else {
397             commandString = mqttCommandValue.getMQTTpublishValue(null);
398         }
399
400         int qos = (config.qos != null) ? config.qos : connection.getQos();
401
402         return connection.publish(config.commandTopic, commandString.getBytes(), qos, config.retained);
403     }
404
405     /**
406      * @return The channelStateUpdateListener
407      */
408     public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
409         return channelStateUpdateListener;
410     }
411
412     /**
413      * @param channelStateUpdateListener The channelStateUpdateListener to set
414      */
415     public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
416         this.channelStateUpdateListener = channelStateUpdateListener;
417     }
418
419     public @Nullable MqttBrokerConnection getConnection() {
420         return connection;
421     }
422
423     /**
424      * This is for tests only to inject a broker connection. Use
425      * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
426      *
427      * @param connection MQTT Broker connection
428      */
429     public void setConnection(MqttBrokerConnection connection) {
430         this.connection = connection;
431     }
432 }