]> git.basschouten.com Git - openhab-addons.git/blob
41482b93e2962a8a5ab1c6d6df152938d2bfa631
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.nio.charset.StandardCharsets;
16 import java.util.ArrayList;
17 import java.util.IllegalFormatException;
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23 import java.util.stream.Stream;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.mqtt.generic.values.TextValue;
28 import org.openhab.binding.mqtt.generic.values.Value;
29 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
30 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
31 import org.openhab.core.library.types.StringType;
32 import org.openhab.core.thing.ChannelUID;
33 import org.openhab.core.types.Command;
34 import org.openhab.core.types.State;
35 import org.openhab.core.types.Type;
36 import org.openhab.core.types.TypeParser;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * This object consists of a {@link Value}, which is updated on the respective MQTT topic change.
42  * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
43  *
44  * @author David Graeff - Initial contribution
45  */
46 @NonNullByDefault
47 public class ChannelState implements MqttMessageSubscriber {
48     private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
49
50     // Immutable channel configuration
51     protected final boolean readOnly;
52     protected final ChannelUID channelUID;
53     protected final ChannelConfig config;
54
55     /** Channel value **/
56     protected final Value cachedValue;
57
58     // Runtime variables
59     private @Nullable MqttBrokerConnection connection;
60     protected final List<ChannelStateTransformation> transformationsIn = new ArrayList<>();
61     protected final List<ChannelStateTransformation> transformationsOut = new ArrayList<>();
62     private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
63     protected boolean hasSubscribed = false;
64     private @Nullable ScheduledFuture<?> scheduledFuture;
65     private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
66     private final Object futureLock = new Object();
67
68     /**
69      * Creates a new channel state.
70      *
71      * @param config The channel configuration
72      * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
73      * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
74      *            needs a cache for the current value.
75      * @param channelStateUpdateListener A channel state update listener
76      */
77     public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
78             @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
79         this.config = config;
80         this.channelStateUpdateListener = channelStateUpdateListener;
81         this.channelUID = channelUID;
82         this.cachedValue = cachedValue;
83         this.readOnly = config.commandTopic.isBlank();
84     }
85
86     public boolean isReadOnly() {
87         return this.readOnly;
88     }
89
90     /**
91      * Add a transformation that is applied for each received MQTT topic value.
92      * The transformations are executed in order.
93      *
94      * @param transformation A transformation
95      */
96     public void addTransformation(ChannelStateTransformation transformation) {
97         transformationsIn.add(transformation);
98     }
99
100     public void addTransformation(String transformation, TransformationServiceProvider transformationServiceProvider) {
101         parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformation(t));
102     }
103
104     /**
105      * Add a transformation that is applied for each value to be published.
106      * The transformations are executed in order.
107      *
108      * @param transformation A transformation
109      */
110     public void addTransformationOut(ChannelStateTransformation transformation) {
111         transformationsOut.add(transformation);
112     }
113
114     public void addTransformationOut(String transformation,
115             TransformationServiceProvider transformationServiceProvider) {
116         parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformationOut(t));
117     }
118
119     public static Stream<ChannelStateTransformation> parseTransformation(String transformation,
120             TransformationServiceProvider transformationServiceProvider) {
121         String[] transformations = transformation.split("∩");
122         return Stream.of(transformations).filter(t -> !t.isBlank())
123                 .map(t -> new ChannelStateTransformation(t, transformationServiceProvider));
124     }
125
126     /**
127      * Clear transformations
128      */
129     public void clearTransformations() {
130         transformationsIn.clear();
131         transformationsOut.clear();
132     }
133
134     /**
135      * Returns the cached value state object of this message subscriber.
136      * <p>
137      * MQTT only notifies us once about a value, during the subscribe.
138      * The channel state therefore needs a cache for the current value.
139      * If MQTT has not yet published a value, the cache might still be in UNDEF state.
140      * </p>
141      */
142     public Value getCache() {
143         return cachedValue;
144     }
145
146     /**
147      * Return the channelUID
148      */
149     public ChannelUID channelUID() {
150         return channelUID;
151     }
152
153     /**
154      * Incoming message from the MqttBrokerConnection
155      *
156      * @param topic The topic. Is the same as the field stateTopic.
157      * @param payload The byte payload. Must be UTF8 encoded text or binary data.
158      */
159     @Override
160     public void processMessage(String topic, byte[] payload) {
161         final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
162         if (channelStateUpdateListener == null) {
163             logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
164             return;
165         }
166
167         if (cachedValue.isBinary()) {
168             cachedValue.update(payload);
169             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
170             receivedOrTimeout();
171             return;
172         }
173
174         // String value: Apply transformations
175         String strValue = new String(payload, StandardCharsets.UTF_8);
176         for (ChannelStateTransformation t : transformationsIn) {
177             String transformedValue = t.processValue(strValue);
178             if (transformedValue != null) {
179                 strValue = transformedValue;
180             } else {
181                 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue, t.serviceName);
182                 receivedOrTimeout();
183                 return;
184             }
185         }
186
187         // Is trigger?: Special handling
188         if (config.trigger) {
189             channelStateUpdateListener.triggerChannel(channelUID, strValue);
190             receivedOrTimeout();
191             return;
192         }
193
194         Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
195         if (command == null) {
196             logger.warn("Incoming payload '{}' on '{}' not supported by type '{}'", strValue, topic,
197                     cachedValue.getClass().getSimpleName());
198             receivedOrTimeout();
199             return;
200         }
201
202         Type parsedType;
203         // Map the string to a command, update the cached value and post the command to the framework
204         try {
205             parsedType = cachedValue.parseMessage(command);
206         } catch (IllegalArgumentException | IllegalStateException e) {
207             logger.warn("Command '{}' from channel '{}' not supported by type '{}': {}", strValue, channelUID,
208                     cachedValue.getClass().getSimpleName(), e.getMessage());
209             receivedOrTimeout();
210             return;
211         }
212
213         if (parsedType instanceof State parsedState) {
214             cachedValue.update(parsedState);
215         } else {
216             // things that are only Commands _must_ be posted as a command (like STOP)
217             channelStateUpdateListener.postChannelCommand(channelUID, (Command) parsedType);
218             receivedOrTimeout();
219             return;
220         }
221
222         State newState = cachedValue.getChannelState();
223         // If the user explicitly wants a command sent, not an update, do that. But
224         // we have to check that the state is even possible to send as a command
225         // (i.e. not UNDEF)
226         if (config.postCommand && newState instanceof Command newCommand) {
227             channelStateUpdateListener.postChannelCommand(channelUID, newCommand);
228         } else {
229             channelStateUpdateListener.updateChannelState(channelUID, newState);
230         }
231         receivedOrTimeout();
232     }
233
234     /**
235      * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
236      */
237     public String getStateTopic() {
238         return config.stateTopic;
239     }
240
241     /**
242      * Return the command topic. Might be an empty string, if this is a read-only channel.
243      */
244     public String getCommandTopic() {
245         return config.commandTopic;
246     }
247
248     /**
249      * Returns the channelType ID which also happens to be an item-type
250      */
251     public String getItemType() {
252         return cachedValue.getItemType();
253     }
254
255     /**
256      * Returns true if this is a stateful channel.
257      */
258     public boolean isStateful() {
259         return config.retained;
260     }
261
262     /**
263      * Removes the subscription to the state topic and resets the channelStateUpdateListener.
264      *
265      * @return A future that completes with true if unsubscribing from the state topic succeeded.
266      *         It completes with false if no connection is established and completes exceptionally otherwise.
267      */
268     public CompletableFuture<@Nullable Void> stop() {
269         final MqttBrokerConnection connection = this.connection;
270         if (connection != null && !config.stateTopic.isBlank()) {
271             return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
272         } else {
273             internalStop();
274             return CompletableFuture.completedFuture(null);
275         }
276     }
277
278     private void internalStop() {
279         logger.debug("Unsubscribed channel {} from topic: {}", this.channelUID, config.stateTopic);
280         this.connection = null;
281         this.channelStateUpdateListener = null;
282         hasSubscribed = false;
283         cachedValue.resetState();
284     }
285
286     private void receivedOrTimeout() {
287         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
288         if (scheduledFuture != null) { // Cancel timeout
289             scheduledFuture.cancel(false);
290             this.scheduledFuture = null;
291         }
292         future.complete(null);
293     }
294
295     private @Nullable Void subscribeFail(Throwable e) {
296         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
297         if (scheduledFuture != null) { // Cancel timeout
298             scheduledFuture.cancel(false);
299             this.scheduledFuture = null;
300         }
301         future.completeExceptionally(e);
302         return null;
303     }
304
305     /**
306      * Subscribes to the state topic on the given connection and informs about updates on the given listener.
307      *
308      * @param connection A broker connection
309      * @param scheduler A scheduler to realize the timeout
310      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
311      * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
312      *         and exceptionally otherwise.
313      */
314     public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
315             int timeout) {
316         synchronized (futureLock) {
317             // if the connection is still the same, the subscription is still present, otherwise we need to renew
318             if ((hasSubscribed || !future.isDone()) && connection.equals(this.connection)) {
319                 return future;
320             }
321             hasSubscribed = false;
322
323             this.connection = connection;
324
325             if (config.stateTopic.isBlank()) {
326                 return CompletableFuture.completedFuture(null);
327             }
328
329             this.future = new CompletableFuture<>();
330         }
331         connection.subscribe(config.stateTopic, this).thenRun(() -> {
332             hasSubscribed = true;
333             logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
334             if (timeout > 0 && !future.isDone()) {
335                 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
336             } else {
337                 receivedOrTimeout();
338             }
339         }).exceptionally(this::subscribeFail);
340         return future;
341     }
342
343     /**
344      * Return true if this channel has subscribed to its MQTT topics.
345      * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
346      * have a stateTopic set, to subscribe this channel.
347      */
348     public boolean hasSubscribed() {
349         return this.hasSubscribed;
350     }
351
352     /**
353      * Publishes a value on MQTT. A command topic needs to be set in the configuration.
354      *
355      * @param command The command to send
356      * @return A future that completes with true if the publishing worked and false if it is a readonly topic
357      *         and exceptionally otherwise.
358      */
359     public CompletableFuture<Boolean> publishValue(Command command) {
360         final MqttBrokerConnection connection = this.connection;
361
362         if (connection == null) {
363             CompletableFuture<Boolean> f = new CompletableFuture<>();
364             f.completeExceptionally(new IllegalStateException(
365                     "The connection object has not been set. start() should have been called!"));
366             return f;
367         }
368
369         Command mqttCommandValue = cachedValue.parseCommand(command);
370         Value mqttFormatter = cachedValue;
371
372         if (readOnly) {
373             logger.debug(
374                     "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
375                     mqttCommandValue, config.commandTopic);
376             return CompletableFuture.completedFuture(false);
377         }
378
379         // Outgoing transformations
380         for (ChannelStateTransformation t : transformationsOut) {
381             String commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
382             String transformedValue = t.processValue(commandString);
383             if (transformedValue != null) {
384                 mqttFormatter = new TextValue();
385                 mqttCommandValue = new StringType(transformedValue);
386             } else {
387                 logger.debug("Transformation '{}' returned null on '{}', discarding message", mqttCommandValue,
388                         t.serviceName);
389                 return CompletableFuture.completedFuture(false);
390             }
391         }
392
393         String commandString;
394
395         // Formatter: Applied before the channel state value is published to the MQTT broker.
396         if (config.formatBeforePublish.length() > 0) {
397             try {
398                 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, config.formatBeforePublish);
399             } catch (IllegalFormatException e) {
400                 logger.debug("Format pattern incorrect for {}", channelUID, e);
401                 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
402             }
403         } else {
404             commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
405         }
406
407         int qos = (config.qos != null) ? config.qos : connection.getQos();
408
409         return connection.publish(config.commandTopic, commandString.getBytes(), qos, config.retained);
410     }
411
412     /**
413      * @return The channelStateUpdateListener
414      */
415     public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
416         return channelStateUpdateListener;
417     }
418
419     /**
420      * @param channelStateUpdateListener The channelStateUpdateListener to set
421      */
422     public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
423         this.channelStateUpdateListener = channelStateUpdateListener;
424     }
425
426     public @Nullable MqttBrokerConnection getConnection() {
427         return connection;
428     }
429
430     /**
431      * This is for tests only to inject a broker connection. Use
432      * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
433      *
434      * @param connection MQTT Broker connection
435      */
436     public void setConnection(MqttBrokerConnection connection) {
437         this.connection = connection;
438     }
439 }