2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.generic.internal.handler;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.Optional;
20 import java.util.concurrent.CompletableFuture;
21 import java.util.stream.Collectors;
23 import javax.measure.Unit;
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
28 import org.openhab.binding.mqtt.generic.ChannelConfig;
29 import org.openhab.binding.mqtt.generic.ChannelState;
30 import org.openhab.binding.mqtt.generic.ChannelStateUpdateListener;
31 import org.openhab.binding.mqtt.generic.MqttChannelStateDescriptionProvider;
32 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
33 import org.openhab.binding.mqtt.generic.internal.MqttBindingConstants;
34 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
35 import org.openhab.binding.mqtt.generic.values.Value;
36 import org.openhab.binding.mqtt.generic.values.ValueFactory;
37 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
38 import org.openhab.core.thing.Channel;
39 import org.openhab.core.thing.ChannelUID;
40 import org.openhab.core.thing.Thing;
41 import org.openhab.core.thing.ThingStatus;
42 import org.openhab.core.thing.ThingStatusDetail;
43 import org.openhab.core.thing.binding.ThingHandlerCallback;
44 import org.openhab.core.thing.binding.builder.ChannelBuilder;
45 import org.openhab.core.thing.binding.builder.ThingBuilder;
46 import org.openhab.core.thing.type.ChannelTypeUID;
47 import org.openhab.core.types.StateDescription;
48 import org.openhab.core.types.util.UnitUtils;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 * This handler manages manual created Things with manually added channels to link to MQTT topics.
55 * @author David Graeff - Initial contribution
58 public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements ChannelStateUpdateListener {
59 private final Logger logger = LoggerFactory.getLogger(GenericMQTTThingHandler.class);
60 final Map<ChannelUID, ChannelState> channelStateByChannelUID = new HashMap<>();
61 protected final MqttChannelStateDescriptionProvider stateDescProvider;
62 protected final TransformationServiceProvider transformationServiceProvider;
65 * Creates a new Thing handler for generic MQTT channels.
67 * @param thing The thing of this handler
68 * @param stateDescProvider A channel state provider
69 * @param transformationServiceProvider The transformation service provider
70 * @param subscribeTimeout The subscribe timeout
72 public GenericMQTTThingHandler(Thing thing, MqttChannelStateDescriptionProvider stateDescProvider,
73 TransformationServiceProvider transformationServiceProvider, int subscribeTimeout) {
74 super(thing, subscribeTimeout);
75 this.stateDescProvider = stateDescProvider;
76 this.transformationServiceProvider = transformationServiceProvider;
80 public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
81 return channelStateByChannelUID.get(channelUID);
85 * Subscribe on all channel static topics on all {@link ChannelState}s.
86 * If subscribing on all channels worked, the thing is put ONLINE, else OFFLINE.
88 * @param connection A started broker connection
91 protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
92 // availability topics are also started asynchronously, so no problem here
93 clearAllAvailabilityTopics();
94 initializeAvailabilityTopicsFromConfig();
95 return channelStateByChannelUID.values().stream().map(c -> c.start(connection, scheduler, 0))
96 .collect(FutureCollector.allOf()).thenRun(() -> calculateAndUpdateThingStatus(false));
100 protected void stop() {
101 channelStateByChannelUID.values().forEach(c -> c.getCache().resetState());
106 public void dispose() {
107 // Remove all state descriptions of this handler
108 channelStateByChannelUID.forEach((uid, state) -> stateDescProvider.remove(uid));
110 // there is a design flaw, we can't clean up our stuff because it is needed by the super-class on disposal for
112 channelStateByChannelUID.clear();
116 public CompletableFuture<Void> unsubscribeAll() {
117 return CompletableFuture.allOf(
118 channelStateByChannelUID.values().stream().map(ChannelState::stop).toArray(CompletableFuture[]::new));
122 * For every Thing channel there exists a corresponding {@link ChannelState}. It consists of the MQTT state
123 * and MQTT command topic, the ChannelUID and a value state.
125 * @param channelConfig The channel configuration that contains MQTT state and command topic and multiple other
127 * @param channelUID The channel UID
128 * @param valueState The channel value state
131 protected ChannelState createChannelState(ChannelConfig channelConfig, ChannelUID channelUID, Value valueState) {
132 ChannelState state = new ChannelState(channelConfig, channelUID, valueState, this);
134 // Incoming value transformations
135 state.addTransformation(channelConfig.transformationPattern, transformationServiceProvider);
136 // Outgoing value transformations
137 state.addTransformationOut(channelConfig.transformationPatternOut, transformationServiceProvider);
143 public void initialize() {
144 initializeAvailabilityTopicsFromConfig();
146 ThingHandlerCallback callback = getCallback();
147 if (callback == null) {
148 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, "Framework failure: callback must not be null");
152 ThingBuilder thingBuilder = editThing();
153 boolean modified = false;
155 List<ChannelUID> configErrors = new ArrayList<>();
156 for (Channel channel : thing.getChannels()) {
157 final ChannelTypeUID channelTypeUID = channel.getChannelTypeUID();
158 if (channelTypeUID == null) {
159 logger.warn("Channel {} has no type", channel.getLabel());
162 final ChannelConfig channelConfig = channel.getConfiguration().as(ChannelConfig.class);
165 .equals(new ChannelTypeUID(MqttBindingConstants.BINDING_ID, MqttBindingConstants.NUMBER))) {
166 Unit<?> unit = UnitUtils.parseUnit(channelConfig.unit);
167 String dimension = unit == null ? null : UnitUtils.getDimensionName(unit);
168 String expectedItemType = dimension == null ? "Number" : "Number:" + dimension; // unknown dimension ->
170 String actualItemType = channel.getAcceptedItemType();
171 if (!expectedItemType.equals(actualItemType)) {
172 ChannelBuilder channelBuilder = callback.createChannelBuilder(channel.getUID(), channelTypeUID)
173 .withAcceptedItemType(expectedItemType).withConfiguration(channel.getConfiguration());
174 String label = channel.getLabel();
176 channelBuilder.withLabel(label);
178 String description = channel.getDescription();
179 if (description != null) {
180 channelBuilder.withDescription(description);
182 thingBuilder.withoutChannel(channel.getUID());
183 thingBuilder.withChannel(channelBuilder.build());
189 Value value = ValueFactory.createValueState(channelConfig, channelTypeUID.getId());
190 ChannelState channelState = createChannelState(channelConfig, channel.getUID(), value);
191 channelStateByChannelUID.put(channel.getUID(), channelState);
192 StateDescription description = value.createStateDescription(channelConfig.commandTopic.isBlank())
193 .build().toStateDescription();
194 if (description != null) {
195 stateDescProvider.setDescription(channel.getUID(), description);
197 } catch (IllegalArgumentException e) {
198 logger.warn("Configuration error for channel '{}'", channel.getUID(), e);
199 configErrors.add(channel.getUID());
204 updateThing(thingBuilder.build());
207 // If some channels could not start up, put the entire thing offline and display the channels
208 // in question to the user.
209 if (!configErrors.isEmpty()) {
210 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Remove and recreate: "
211 + configErrors.stream().map(ChannelUID::getAsString).collect(Collectors.joining(",")));
218 protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availibilityTopicsSeen) {
219 if (availibilityTopicsSeen.orElse(true)) {
220 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
222 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
226 private void initializeAvailabilityTopicsFromConfig() {
227 GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);
229 String availabilityTopic = config.availabilityTopic;
231 if (availabilityTopic != null) {
232 addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable,
233 config.transformationPattern, transformationServiceProvider);
235 clearAllAvailabilityTopics();