2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.generic;
15 import java.nio.charset.StandardCharsets;
16 import java.util.ArrayList;
17 import java.util.IllegalFormatException;
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23 import java.util.stream.Stream;
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.mqtt.generic.values.TextValue;
28 import org.openhab.binding.mqtt.generic.values.Value;
29 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
30 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
31 import org.openhab.core.library.types.DecimalType;
32 import org.openhab.core.library.types.QuantityType;
33 import org.openhab.core.library.types.StopMoveType;
34 import org.openhab.core.library.types.StringType;
35 import org.openhab.core.thing.ChannelUID;
36 import org.openhab.core.types.Command;
37 import org.openhab.core.types.State;
38 import org.openhab.core.types.Type;
39 import org.openhab.core.types.TypeParser;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 * This object consists of a {@link Value}, which is updated on the respective MQTT topic change.
45 * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
47 * @author David Graeff - Initial contribution
50 public class ChannelState implements MqttMessageSubscriber {
51 private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
53 // Immutable channel configuration
54 protected final boolean readOnly;
55 protected final ChannelUID channelUID;
56 protected final ChannelConfig config;
59 protected final Value cachedValue;
62 private @Nullable MqttBrokerConnection connection;
63 protected final List<ChannelStateTransformation> transformationsIn = new ArrayList<>();
64 protected final List<ChannelStateTransformation> transformationsOut = new ArrayList<>();
65 private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
66 protected boolean hasSubscribed = false;
67 private @Nullable ScheduledFuture<?> scheduledFuture;
68 private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
69 private final Object futureLock = new Object();
72 * Creates a new channel state.
74 * @param config The channel configuration
75 * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
76 * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
77 * needs a cache for the current value.
78 * @param channelStateUpdateListener A channel state update listener
80 public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
81 @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
83 this.channelStateUpdateListener = channelStateUpdateListener;
84 this.channelUID = channelUID;
85 this.cachedValue = cachedValue;
86 this.readOnly = config.commandTopic.isBlank();
89 public boolean isReadOnly() {
94 * Add a transformation that is applied for each received MQTT topic value.
95 * The transformations are executed in order.
97 * @param transformation A transformation
99 public void addTransformation(ChannelStateTransformation transformation) {
100 transformationsIn.add(transformation);
103 public void addTransformation(String transformation, TransformationServiceProvider transformationServiceProvider) {
104 parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformation(t));
108 * Add a transformation that is applied for each value to be published.
109 * The transformations are executed in order.
111 * @param transformation A transformation
113 public void addTransformationOut(ChannelStateTransformation transformation) {
114 transformationsOut.add(transformation);
117 public void addTransformationOut(String transformation,
118 TransformationServiceProvider transformationServiceProvider) {
119 parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformationOut(t));
122 public static Stream<ChannelStateTransformation> parseTransformation(String transformation,
123 TransformationServiceProvider transformationServiceProvider) {
124 String[] transformations = transformation.split("∩");
125 return Stream.of(transformations).filter(t -> !t.isBlank())
126 .map(t -> new ChannelStateTransformation(t, transformationServiceProvider));
130 * Clear transformations
132 public void clearTransformations() {
133 transformationsIn.clear();
134 transformationsOut.clear();
138 * Returns the cached value state object of this message subscriber.
140 * MQTT only notifies us once about a value, during the subscribe.
141 * The channel state therefore needs a cache for the current value.
142 * If MQTT has not yet published a value, the cache might still be in UNDEF state.
145 public Value getCache() {
150 * Return the channelUID
152 public ChannelUID channelUID() {
157 * Incoming message from the MqttBrokerConnection
159 * @param topic The topic. Is the same as the field stateTopic.
160 * @param payload The byte payload. Must be UTF8 encoded text or binary data.
163 public void processMessage(String topic, byte[] payload) {
164 final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
165 if (channelStateUpdateListener == null) {
166 logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
170 if (cachedValue.isBinary()) {
171 cachedValue.update(payload);
172 channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
177 // String value: Apply transformations
178 String strValue = new String(payload, StandardCharsets.UTF_8);
179 for (ChannelStateTransformation t : transformationsIn) {
180 String transformedValue = t.processValue(strValue);
181 if (transformedValue != null) {
182 strValue = transformedValue;
184 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue, t.serviceName);
190 // Is trigger?: Special handling
191 if (config.trigger) {
192 channelStateUpdateListener.triggerChannel(channelUID, strValue);
197 Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
198 if (command == null) {
199 logger.warn("Incoming payload '{}' on '{}' not supported by type '{}'", strValue, topic,
200 cachedValue.getClass().getSimpleName());
206 // Map the string to a command, update the cached value and post the command to the framework
208 parsedType = cachedValue.parseMessage(command);
209 } catch (IllegalArgumentException | IllegalStateException e) {
210 logger.warn("Command '{}' from channel '{}' not supported by type '{}': {}", strValue, channelUID,
211 cachedValue.getClass().getSimpleName(), e.getMessage());
216 if (parsedType instanceof State parsedState) {
217 cachedValue.update(parsedState);
219 // things that are only Commands _must_ be posted as a command (like STOP)
220 channelStateUpdateListener.postChannelCommand(channelUID, (Command) parsedType);
225 State newState = cachedValue.getChannelState();
226 // If the user explicitly wants a command sent, not an update, do that. But
227 // we have to check that the state is even possible to send as a command
229 if (config.postCommand && newState instanceof Command newCommand) {
230 channelStateUpdateListener.postChannelCommand(channelUID, newCommand);
232 channelStateUpdateListener.updateChannelState(channelUID, newState);
238 * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
240 public String getStateTopic() {
241 return config.stateTopic;
245 * Return the command topic. Might be an empty string, if this is a read-only channel.
247 public String getCommandTopic() {
248 return config.commandTopic;
252 * Returns the channelType ID which also happens to be an item-type
254 public String getItemType() {
255 return cachedValue.getItemType();
259 * Returns true if this is a stateful channel.
261 public boolean isStateful() {
262 return config.retained;
266 * Removes the subscription to the state topic and resets the channelStateUpdateListener.
268 * @return A future that completes with true if unsubscribing from the state topic succeeded.
269 * It completes with false if no connection is established and completes exceptionally otherwise.
271 public CompletableFuture<@Nullable Void> stop() {
272 final MqttBrokerConnection connection = this.connection;
273 if (connection != null && !config.stateTopic.isBlank()) {
274 return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
277 return CompletableFuture.completedFuture(null);
281 private void internalStop() {
282 logger.debug("Unsubscribed channel {} from topic: {}", this.channelUID, config.stateTopic);
283 this.connection = null;
284 this.channelStateUpdateListener = null;
285 hasSubscribed = false;
286 cachedValue.resetState();
289 private void receivedOrTimeout() {
290 final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
291 if (scheduledFuture != null) { // Cancel timeout
292 scheduledFuture.cancel(false);
293 this.scheduledFuture = null;
295 future.complete(null);
298 private @Nullable Void subscribeFail(Throwable e) {
299 final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
300 if (scheduledFuture != null) { // Cancel timeout
301 scheduledFuture.cancel(false);
302 this.scheduledFuture = null;
304 future.completeExceptionally(e);
309 * Subscribes to the state topic on the given connection and informs about updates on the given listener.
311 * @param connection A broker connection
312 * @param scheduler A scheduler to realize the timeout
313 * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
314 * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
315 * and exceptionally otherwise.
317 public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
319 synchronized (futureLock) {
320 // if the connection is still the same, the subscription is still present, otherwise we need to renew
321 if ((hasSubscribed || !future.isDone()) && connection.equals(this.connection)) {
324 hasSubscribed = false;
326 this.connection = connection;
328 if (config.stateTopic.isBlank()) {
329 return CompletableFuture.completedFuture(null);
332 this.future = new CompletableFuture<>();
334 connection.subscribe(config.stateTopic, this).thenRun(() -> {
335 hasSubscribed = true;
336 logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
337 if (timeout > 0 && !future.isDone()) {
338 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
342 }).exceptionally(this::subscribeFail);
347 * Return true if this channel has subscribed to its MQTT topics.
348 * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
349 * have a stateTopic set, to subscribe this channel.
351 public boolean hasSubscribed() {
352 return this.hasSubscribed;
356 * Publishes a value on MQTT. A command topic needs to be set in the configuration.
358 * @param command The command to send
359 * @return A future that completes with true if the publishing worked and false if it is a readonly topic
360 * and exceptionally otherwise.
362 public CompletableFuture<Boolean> publishValue(Command command) {
363 final MqttBrokerConnection connection = this.connection;
365 if (connection == null) {
366 CompletableFuture<Boolean> f = new CompletableFuture<>();
367 f.completeExceptionally(new IllegalStateException(
368 "The connection object has not been set. start() should have been called!"));
372 Command mqttCommandValue = cachedValue.parseCommand(command);
373 Value mqttFormatter = cachedValue;
377 "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
378 mqttCommandValue, config.commandTopic);
379 return CompletableFuture.completedFuture(false);
382 // Outgoing transformations
383 for (ChannelStateTransformation t : transformationsOut) {
384 Command cValue = mqttCommandValue;
385 // Only pass numeric value for QuantityType.
386 if (mqttCommandValue instanceof QuantityType<?> qtCommandValue) {
387 cValue = new DecimalType(qtCommandValue.toBigDecimal());
390 String commandString = mqttFormatter.getMQTTpublishValue(cValue, "%s");
391 String transformedValue = t.processValue(commandString);
392 if (transformedValue != null) {
393 mqttFormatter = new TextValue();
394 mqttCommandValue = new StringType(transformedValue);
396 logger.debug("Transformation '{}' returned null on '{}', discarding message", mqttCommandValue,
398 return CompletableFuture.completedFuture(false);
402 String commandString;
404 // Formatter: Applied before the channel state value is published to the MQTT broker.
405 if (config.formatBeforePublish.length() > 0) {
407 Command cValue = mqttCommandValue;
408 // Only pass numeric value for QuantityType of format pattern is %s.
409 if ((mqttCommandValue instanceof QuantityType<?> qtCommandValue)
410 && ("%s".equals(config.formatBeforePublish) || "%S".equals(config.formatBeforePublish))) {
411 cValue = new DecimalType(qtCommandValue.toBigDecimal());
413 commandString = mqttFormatter.getMQTTpublishValue(cValue, config.formatBeforePublish);
414 } catch (IllegalFormatException e) {
415 logger.debug("Format pattern incorrect for {}", channelUID, e);
416 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
419 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
422 int qos = (config.qos != null) ? config.qos : connection.getQos();
425 if (command.equals(StopMoveType.STOP) && !config.stopCommandTopic.isEmpty()) {
426 commandTopic = config.stopCommandTopic;
428 commandTopic = config.commandTopic;
430 return connection.publish(commandTopic, commandString.getBytes(), qos, config.retained);
434 * @return The channelStateUpdateListener
436 public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
437 return channelStateUpdateListener;
441 * @param channelStateUpdateListener The channelStateUpdateListener to set
443 public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
444 this.channelStateUpdateListener = channelStateUpdateListener;
447 public @Nullable MqttBrokerConnection getConnection() {
452 * This is for tests only to inject a broker connection. Use
453 * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
455 * @param connection MQTT Broker connection
457 public void setConnection(MqttBrokerConnection connection) {
458 this.connection = connection;