2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.generic;
15 import java.nio.charset.StandardCharsets;
16 import java.util.ArrayList;
17 import java.util.IllegalFormatException;
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
24 import org.apache.commons.lang.StringUtils;
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.mqtt.generic.values.TextValue;
28 import org.openhab.binding.mqtt.generic.values.Value;
29 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
30 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
31 import org.openhab.core.library.types.StringType;
32 import org.openhab.core.thing.ChannelUID;
33 import org.openhab.core.types.Command;
34 import org.openhab.core.types.TypeParser;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * This object consists of an {@link Value}, which is updated on the respective MQTT topic change.
40 * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
42 * @author David Graeff - Initial contribution
45 public class ChannelState implements MqttMessageSubscriber {
46 private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
48 // Immutable channel configuration
49 protected final boolean readOnly;
50 protected final ChannelUID channelUID;
51 protected final ChannelConfig config;
54 protected final Value cachedValue;
57 private @Nullable MqttBrokerConnection connection;
58 protected final List<ChannelStateTransformation> transformationsIn = new ArrayList<>();
59 protected final List<ChannelStateTransformation> transformationsOut = new ArrayList<>();
60 private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
61 protected boolean hasSubscribed = false;
62 private @Nullable ScheduledFuture<?> scheduledFuture;
63 private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
64 private final Object futureLock = new Object();
67 * Creates a new channel state.
69 * @param config The channel configuration
70 * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
71 * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
72 * needs a cache for the current value.
73 * @param channelStateUpdateListener A channel state update listener
75 public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
76 @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
78 this.channelStateUpdateListener = channelStateUpdateListener;
79 this.channelUID = channelUID;
80 this.cachedValue = cachedValue;
81 this.readOnly = StringUtils.isBlank(config.commandTopic);
84 public boolean isReadOnly() {
89 * Add a transformation that is applied for each received MQTT topic value.
90 * The transformations are executed in order.
92 * @param transformation A transformation
94 public void addTransformation(ChannelStateTransformation transformation) {
95 transformationsIn.add(transformation);
99 * Add a transformation that is applied for each value to be published.
100 * The transformations are executed in order.
102 * @param transformation A transformation
104 public void addTransformationOut(ChannelStateTransformation transformation) {
105 transformationsOut.add(transformation);
109 * Clear transformations
111 public void clearTransformations() {
112 transformationsIn.clear();
113 transformationsOut.clear();
117 * Returns the cached value state object of this message subscriber.
119 * MQTT only notifies us once about a value, during the subscribe.
120 * The channel state therefore needs a cache for the current value.
121 * If MQTT has not yet published a value, the cache might still be in UNDEF state.
124 public Value getCache() {
129 * Return the channelUID
131 public ChannelUID channelUID() {
136 * Incoming message from the MqttBrokerConnection
138 * @param topic The topic. Is the same as the field stateTopic.
139 * @param payload The byte payload. Must be UTF8 encoded text or binary data.
142 public void processMessage(String topic, byte[] payload) {
143 final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
144 if (channelStateUpdateListener == null) {
145 logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
149 if (cachedValue.isBinary()) {
150 cachedValue.update(payload);
151 channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
156 // String value: Apply transformations
157 String strValue = new String(payload, StandardCharsets.UTF_8);
158 for (ChannelStateTransformation t : transformationsIn) {
159 String transformedValue = t.processValue(strValue);
160 if (transformedValue != null) {
161 strValue = transformedValue;
163 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue, t.serviceName);
169 // Is trigger?: Special handling
170 if (config.trigger) {
171 channelStateUpdateListener.triggerChannel(channelUID, strValue);
176 Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
177 if (command == null) {
178 logger.warn("Incoming payload '{}' not supported by type '{}'", strValue,
179 cachedValue.getClass().getSimpleName());
184 Command postOnlyCommand = cachedValue.isPostOnly(command);
185 if (postOnlyCommand != null) {
186 channelStateUpdateListener.postChannelCommand(channelUID, postOnlyCommand);
191 // Map the string to an ESH command, update the cached value and post the command to the framework
193 cachedValue.update(command);
194 } catch (IllegalArgumentException | IllegalStateException e) {
195 logger.warn("Command '{}' not supported by type '{}': {}", strValue, cachedValue.getClass().getSimpleName(),
201 if (config.postCommand) {
202 channelStateUpdateListener.postChannelCommand(channelUID, (Command) cachedValue.getChannelState());
204 channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
210 * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
212 public String getStateTopic() {
213 return config.stateTopic;
217 * Return the command topic. Might be an empty string, if this is a read-only channel.
219 public String getCommandTopic() {
220 return config.commandTopic;
224 * Returns the channelType ID which also happens to be an item-type
226 public String getItemType() {
227 return cachedValue.getItemType();
231 * Returns true if this is a stateful channel.
233 public boolean isStateful() {
234 return config.retained;
238 * Removes the subscription to the state topic and resets the channelStateUpdateListener.
240 * @return A future that completes with true if unsubscribing from the state topic succeeded.
241 * It completes with false if no connection is established and completes exceptionally otherwise.
243 public CompletableFuture<@Nullable Void> stop() {
244 final MqttBrokerConnection connection = this.connection;
245 if (connection != null && StringUtils.isNotBlank(config.stateTopic)) {
246 return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
249 return CompletableFuture.completedFuture(null);
253 private void internalStop() {
254 logger.debug("Unsubscribed channel {} form topic: {}", this.channelUID, config.stateTopic);
255 this.connection = null;
256 this.channelStateUpdateListener = null;
257 hasSubscribed = false;
258 cachedValue.resetState();
261 private void receivedOrTimeout() {
262 final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
263 if (scheduledFuture != null) { // Cancel timeout
264 scheduledFuture.cancel(false);
265 this.scheduledFuture = null;
267 future.complete(null);
270 private @Nullable Void subscribeFail(Throwable e) {
271 final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
272 if (scheduledFuture != null) { // Cancel timeout
273 scheduledFuture.cancel(false);
274 this.scheduledFuture = null;
276 future.completeExceptionally(e);
281 * Subscribes to the state topic on the given connection and informs about updates on the given listener.
283 * @param connection A broker connection
284 * @param scheduler A scheduler to realize the timeout
285 * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
286 * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
287 * and exceptionally otherwise.
289 public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
291 synchronized (futureLock) {
292 // if the connection is still the same, the subscription is still present, otherwise we need to renew
293 if (hasSubscribed || !future.isDone() && connection.equals(this.connection)) {
296 hasSubscribed = false;
298 this.connection = connection;
300 if (StringUtils.isBlank(config.stateTopic)) {
301 return CompletableFuture.completedFuture(null);
304 this.future = new CompletableFuture<>();
306 connection.subscribe(config.stateTopic, this).thenRun(() -> {
307 hasSubscribed = true;
308 logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
309 if (timeout > 0 && !future.isDone()) {
310 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
314 }).exceptionally(this::subscribeFail);
319 * Return true if this channel has subscribed to its MQTT topics.
320 * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
321 * have a stateTopic set, to subscribe this channel.
323 public boolean hasSubscribed() {
324 return this.hasSubscribed;
328 * Publishes a value on MQTT. A command topic needs to be set in the configuration.
330 * @param command The command to send
331 * @return A future that completes with true if the publishing worked and false if it is a readonly topic
332 * and exceptionally otherwise.
334 public CompletableFuture<Boolean> publishValue(Command command) {
335 cachedValue.update(command);
337 Value mqttCommandValue = cachedValue;
339 final MqttBrokerConnection connection = this.connection;
341 if (connection == null) {
342 CompletableFuture<Boolean> f = new CompletableFuture<>();
343 f.completeExceptionally(new IllegalStateException(
344 "The connection object has not been set. start() should have been called!"));
350 "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
351 mqttCommandValue, config.commandTopic);
352 return CompletableFuture.completedFuture(false);
355 // Outgoing transformations
356 for (ChannelStateTransformation t : transformationsOut) {
357 String commandString = mqttCommandValue.getMQTTpublishValue(null);
358 String transformedValue = t.processValue(commandString);
359 if (transformedValue != null) {
360 Value textValue = new TextValue();
361 textValue.update(new StringType(transformedValue));
362 mqttCommandValue = textValue;
364 logger.debug("Transformation '{}' returned null on '{}', discarding message", mqttCommandValue,
366 return CompletableFuture.completedFuture(false);
370 String commandString;
372 // Formatter: Applied before the channel state value is published to the MQTT broker.
373 if (config.formatBeforePublish.length() > 0) {
375 commandString = mqttCommandValue.getMQTTpublishValue(config.formatBeforePublish);
376 } catch (IllegalFormatException e) {
377 logger.debug("Format pattern incorrect for {}", channelUID, e);
378 commandString = mqttCommandValue.getMQTTpublishValue(null);
381 commandString = mqttCommandValue.getMQTTpublishValue(null);
384 int qos = (config.qos != null) ? config.qos : connection.getQos();
386 return connection.publish(config.commandTopic, commandString.getBytes(), qos, config.retained);
390 * @return The channelStateUpdateListener
392 public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
393 return channelStateUpdateListener;
397 * @param channelStateUpdateListener The channelStateUpdateListener to set
399 public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
400 this.channelStateUpdateListener = channelStateUpdateListener;
403 public @Nullable MqttBrokerConnection getConnection() {
408 * This is for tests only to inject a broker connection. Use
409 * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
411 * @param connection MQTT Broker connection
413 public void setConnection(MqttBrokerConnection connection) {
414 this.connection = connection;