2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.generic;
15 import java.nio.charset.StandardCharsets;
16 import java.util.IllegalFormatException;
17 import java.util.Optional;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.mqtt.generic.values.TextValue;
26 import org.openhab.binding.mqtt.generic.values.Value;
27 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
28 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
29 import org.openhab.core.library.types.DecimalType;
30 import org.openhab.core.library.types.QuantityType;
31 import org.openhab.core.library.types.StopMoveType;
32 import org.openhab.core.library.types.StringType;
33 import org.openhab.core.thing.ChannelUID;
34 import org.openhab.core.thing.binding.generic.ChannelTransformation;
35 import org.openhab.core.types.Command;
36 import org.openhab.core.types.State;
37 import org.openhab.core.types.Type;
38 import org.openhab.core.types.TypeParser;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * This object consists of a {@link Value}, which is updated on the respective MQTT topic change.
44 * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
46 * @author David Graeff - Initial contribution
49 public class ChannelState implements MqttMessageSubscriber {
50 private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
52 // Immutable channel configuration
53 protected final boolean readOnly;
54 protected final ChannelConfig config;
57 protected final Value cachedValue;
60 protected ChannelUID channelUID;
61 private @Nullable MqttBrokerConnection connection;
62 protected final ChannelTransformation incomingTransformation;
63 protected final ChannelTransformation outgoingTransformation;
64 private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
65 protected boolean hasSubscribed = false;
66 private @Nullable ScheduledFuture<?> scheduledFuture;
67 private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
68 private final Object futureLock = new Object();
71 * Creates a new channel state.
73 * @param config The channel configuration
74 * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
75 * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
76 * needs a cache for the current value.
77 * @param channelStateUpdateListener A channel state update listener
79 public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
80 @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
81 this(config, channelUID, cachedValue, channelStateUpdateListener,
82 new ChannelTransformation(config.transformationPattern),
83 new ChannelTransformation(config.transformationPatternOut));
87 * Creates a new channel state.
89 * @param config The channel configuration
90 * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
91 * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
92 * needs a cache for the current value.
93 * @param channelStateUpdateListener A channel state update listener
94 * @param incomingTransformation A transformation to apply to incoming values
95 * @param outgoingTransformation A transformation to apply to outgoing values
97 public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
98 @Nullable ChannelStateUpdateListener channelStateUpdateListener,
99 @Nullable ChannelTransformation incomingTransformation,
100 @Nullable ChannelTransformation outgoingTransformation) {
101 this.config = config;
102 this.channelStateUpdateListener = channelStateUpdateListener;
103 this.channelUID = channelUID;
104 this.cachedValue = cachedValue;
105 this.readOnly = config.commandTopic.isBlank();
106 this.incomingTransformation = incomingTransformation == null ? new ChannelTransformation((String) null)
107 : incomingTransformation;
108 this.outgoingTransformation = outgoingTransformation == null ? new ChannelTransformation((String) null)
109 : outgoingTransformation;
112 public boolean isReadOnly() {
113 return this.readOnly;
117 * Returns the cached value state object of this message subscriber.
119 * MQTT only notifies us once about a value, during the subscribe.
120 * The channel state therefore needs a cache for the current value.
121 * If MQTT has not yet published a value, the cache might still be in UNDEF state.
124 public Value getCache() {
129 * Return the channelUID
131 public ChannelUID channelUID() {
135 // If the UID of the channel changed after it was initially created
136 public void setChannelUID(ChannelUID channelUID) {
137 this.channelUID = channelUID;
141 * Incoming message from the MqttBrokerConnection
143 * @param topic The topic. Is the same as the field stateTopic.
144 * @param payload The byte payload. Must be UTF8 encoded text or binary data.
147 public void processMessage(String topic, byte[] payload) {
148 final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
149 if (channelStateUpdateListener == null) {
150 logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
154 if (cachedValue.isBinary()) {
155 cachedValue.update(payload);
156 channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
161 // String value: Apply transformations
162 String strValue = new String(payload, StandardCharsets.UTF_8);
163 if (incomingTransformation.isPresent()) {
164 Optional<String> transformedValue = incomingTransformation.apply(strValue);
165 if (transformedValue.isEmpty()) {
166 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue,
167 incomingTransformation);
171 strValue = transformedValue.get();
174 // Is trigger?: Special handling
175 if (config.trigger) {
176 channelStateUpdateListener.triggerChannel(channelUID, strValue);
181 Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
182 if (command == null) {
183 logger.warn("Incoming payload '{}' on '{}' not supported by type '{}'", strValue, topic,
184 cachedValue.getClass().getSimpleName());
190 // Map the string to a command, update the cached value and post the command to the framework
192 parsedType = cachedValue.parseMessage(command);
193 } catch (IllegalArgumentException | IllegalStateException e) {
194 logger.warn("Command '{}' from channel '{}' not supported by type '{}': {}", strValue, channelUID,
195 cachedValue.getClass().getSimpleName(), e.getMessage());
200 if (parsedType instanceof State parsedState) {
201 cachedValue.update(parsedState);
203 // things that are only Commands _must_ be posted as a command (like STOP)
204 channelStateUpdateListener.postChannelCommand(channelUID, (Command) parsedType);
209 State newState = cachedValue.getChannelState();
210 // If the user explicitly wants a command sent, not an update, do that. But
211 // we have to check that the state is even possible to send as a command
213 if (config.postCommand && newState instanceof Command newCommand) {
214 channelStateUpdateListener.postChannelCommand(channelUID, newCommand);
216 channelStateUpdateListener.updateChannelState(channelUID, newState);
222 * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
224 public String getStateTopic() {
225 return config.stateTopic;
229 * Return the command topic. Might be an empty string, if this is a read-only channel.
231 public String getCommandTopic() {
232 return config.commandTopic;
236 * Returns the channelType ID which also happens to be an item-type
238 public String getItemType() {
239 return cachedValue.getItemType();
243 * Returns true if this is a stateful channel.
245 public boolean isStateful() {
246 return config.retained;
250 * Removes the subscription to the state topic and resets the channelStateUpdateListener.
252 * @return A future that completes with true if unsubscribing from the state topic succeeded.
253 * It completes with false if no connection is established and completes exceptionally otherwise.
255 public CompletableFuture<@Nullable Void> stop() {
256 final MqttBrokerConnection connection = this.connection;
257 if (connection != null && !config.stateTopic.isBlank()) {
258 return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
261 return CompletableFuture.completedFuture(null);
265 private void internalStop() {
266 logger.debug("Unsubscribed channel {} from topic: {}", this.channelUID, config.stateTopic);
267 this.connection = null;
268 this.channelStateUpdateListener = null;
269 hasSubscribed = false;
270 cachedValue.resetState();
273 private void receivedOrTimeout() {
274 final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
275 if (scheduledFuture != null) { // Cancel timeout
276 scheduledFuture.cancel(false);
277 this.scheduledFuture = null;
279 future.complete(null);
282 private @Nullable Void subscribeFail(Throwable e) {
283 final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
284 if (scheduledFuture != null) { // Cancel timeout
285 scheduledFuture.cancel(false);
286 this.scheduledFuture = null;
288 future.completeExceptionally(e);
293 * Subscribes to the state topic on the given connection and informs about updates on the given listener.
295 * @param connection A broker connection
296 * @param scheduler A scheduler to realize the timeout
297 * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
298 * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
299 * and exceptionally otherwise.
301 public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
303 synchronized (futureLock) {
304 // if the connection is still the same, the subscription is still present, otherwise we need to renew
305 if ((hasSubscribed || !future.isDone()) && connection.equals(this.connection)) {
308 hasSubscribed = false;
310 this.connection = connection;
312 if (config.stateTopic.isBlank()) {
313 return CompletableFuture.completedFuture(null);
316 this.future = new CompletableFuture<>();
318 connection.subscribe(config.stateTopic, this).thenRun(() -> {
319 hasSubscribed = true;
320 logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
321 if (timeout > 0 && !future.isDone()) {
322 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
326 }).exceptionally(this::subscribeFail);
331 * Return true if this channel has subscribed to its MQTT topics.
332 * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
333 * have a stateTopic set, to subscribe this channel.
335 public boolean hasSubscribed() {
336 return this.hasSubscribed;
340 * Publishes a value on MQTT. A command topic needs to be set in the configuration.
342 * @param command The command to send
343 * @return A future that completes with true if the publishing worked and false if it is a readonly topic
344 * and exceptionally otherwise.
346 public CompletableFuture<Boolean> publishValue(Command command) {
347 final MqttBrokerConnection connection = this.connection;
349 if (connection == null) {
350 CompletableFuture<Boolean> f = new CompletableFuture<>();
351 f.completeExceptionally(new IllegalStateException(
352 "The connection object has not been set. start() should have been called!"));
356 Command mqttCommandValue = cachedValue.parseCommand(command);
357 Value mqttFormatter = cachedValue;
361 "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
362 mqttCommandValue, config.commandTopic);
363 return CompletableFuture.completedFuture(false);
366 // Outgoing transformations
367 if (outgoingTransformation.isPresent()) {
368 Command cValue = mqttCommandValue;
369 // Only pass numeric value for QuantityType.
370 if (mqttCommandValue instanceof QuantityType<?> qtCommandValue) {
371 cValue = new DecimalType(qtCommandValue.toBigDecimal());
373 String commandString = mqttFormatter.getMQTTpublishValue(cValue, "%s");
374 Optional<String> transformedValue = outgoingTransformation.apply(commandString);
375 if (transformedValue.isEmpty()) {
376 logger.debug("Transformation '{}' returned null on '{}', discarding message", outgoingTransformation,
378 return CompletableFuture.completedFuture(false);
381 mqttFormatter = new TextValue();
382 mqttCommandValue = new StringType(transformedValue.get());
385 String commandString;
387 // Formatter: Applied before the channel state value is published to the MQTT broker.
388 if (config.formatBeforePublish.length() > 0) {
390 Command cValue = mqttCommandValue;
391 // Only pass numeric value for QuantityType of format pattern is %s.
392 if ((mqttCommandValue instanceof QuantityType<?> qtCommandValue)
393 && ("%s".equals(config.formatBeforePublish) || "%S".equals(config.formatBeforePublish))) {
394 cValue = new DecimalType(qtCommandValue.toBigDecimal());
396 commandString = mqttFormatter.getMQTTpublishValue(cValue, config.formatBeforePublish);
397 } catch (IllegalFormatException e) {
398 logger.debug("Format pattern incorrect for {}", channelUID, e);
399 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
402 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
405 int qos = (config.qos != null) ? config.qos : connection.getQos();
408 if (command.equals(StopMoveType.STOP) && !config.stopCommandTopic.isEmpty()) {
409 commandTopic = config.stopCommandTopic;
411 commandTopic = config.commandTopic;
413 return connection.publish(commandTopic, commandString.getBytes(), qos, config.retained);
417 * @return The channelStateUpdateListener
419 public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
420 return channelStateUpdateListener;
424 * @param channelStateUpdateListener The channelStateUpdateListener to set
426 public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
427 this.channelStateUpdateListener = channelStateUpdateListener;
430 public @Nullable MqttBrokerConnection getConnection() {
435 * This is for tests only to inject a broker connection. Use
436 * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
438 * @param connection MQTT Broker connection
440 public void setConnection(MqttBrokerConnection connection) {
441 this.connection = connection;