]> git.basschouten.com Git - openhab-addons.git/blob
e2375aa889c8316898844946f2586de038f796a8
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic.internal.handler;
14
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.stream.Collectors;
21 import java.util.stream.Stream;
22
23 import org.apache.commons.lang.StringUtils;
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
27 import org.openhab.binding.mqtt.generic.ChannelConfig;
28 import org.openhab.binding.mqtt.generic.ChannelState;
29 import org.openhab.binding.mqtt.generic.ChannelStateTransformation;
30 import org.openhab.binding.mqtt.generic.ChannelStateUpdateListener;
31 import org.openhab.binding.mqtt.generic.MqttChannelStateDescriptionProvider;
32 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
33 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
34 import org.openhab.binding.mqtt.generic.values.Value;
35 import org.openhab.binding.mqtt.generic.values.ValueFactory;
36 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
37 import org.openhab.core.thing.Channel;
38 import org.openhab.core.thing.ChannelUID;
39 import org.openhab.core.thing.Thing;
40 import org.openhab.core.thing.ThingStatus;
41 import org.openhab.core.thing.ThingStatusDetail;
42 import org.openhab.core.thing.type.ChannelTypeUID;
43 import org.openhab.core.types.StateDescription;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * This handler manages manual created Things with manually added channels to link to MQTT topics.
49  *
50  * @author David Graeff - Initial contribution
51  */
52 @NonNullByDefault
53 public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements ChannelStateUpdateListener {
54     private final Logger logger = LoggerFactory.getLogger(GenericMQTTThingHandler.class);
55     final Map<ChannelUID, ChannelState> channelStateByChannelUID = new HashMap<>();
56     protected final MqttChannelStateDescriptionProvider stateDescProvider;
57     protected final TransformationServiceProvider transformationServiceProvider;
58
59     /**
60      * Creates a new Thing handler for generic MQTT channels.
61      *
62      * @param thing The thing of this handler
63      * @param stateDescProvider A channel state provider
64      * @param transformationServiceProvider The transformation service provider
65      * @param subscribeTimeout The subscribe timeout
66      */
67     public GenericMQTTThingHandler(Thing thing, MqttChannelStateDescriptionProvider stateDescProvider,
68             TransformationServiceProvider transformationServiceProvider, int subscribeTimeout) {
69         super(thing, subscribeTimeout);
70         this.stateDescProvider = stateDescProvider;
71         this.transformationServiceProvider = transformationServiceProvider;
72     }
73
74     @Override
75     public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
76         return channelStateByChannelUID.get(channelUID);
77     }
78
79     /**
80      * Subscribe on all channel static topics on all {@link ChannelState}s.
81      * If subscribing on all channels worked, the thing is put ONLINE, else OFFLINE.
82      *
83      * @param connection A started broker connection
84      */
85     @Override
86     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
87         return channelStateByChannelUID.values().stream().map(c -> c.start(connection, scheduler, 0))
88                 .collect(FutureCollector.allOf()).thenRun(this::calculateThingStatus);
89     }
90
91     @Override
92     protected void stop() {
93         channelStateByChannelUID.values().forEach(c -> c.getCache().resetState());
94         super.stop();
95     }
96
97     @Override
98     public void dispose() {
99         // Remove all state descriptions of this handler
100         channelStateByChannelUID.forEach((uid, state) -> stateDescProvider.remove(uid));
101         super.dispose();
102         // there is a design flaw, we can't clean up our stuff because it is needed by the super-class on disposal for
103         // unsubscribing
104         channelStateByChannelUID.clear();
105     }
106
107     @Override
108     public CompletableFuture<Void> unsubscribeAll() {
109         return CompletableFuture.allOf(
110                 channelStateByChannelUID.values().stream().map(ChannelState::stop).toArray(CompletableFuture[]::new));
111     }
112
113     /**
114      * For every Thing channel there exists a corresponding {@link ChannelState}. It consists of the MQTT state
115      * and MQTT command topic, the ChannelUID and a value state.
116      *
117      * @param channelConfig The channel configuration that contains MQTT state and command topic and multiple other
118      *            configurations.
119      * @param channelUID The channel UID
120      * @param valueState The channel value state
121      * @return
122      */
123     protected ChannelState createChannelState(ChannelConfig channelConfig, ChannelUID channelUID, Value valueState) {
124         ChannelState state = new ChannelState(channelConfig, channelUID, valueState, this);
125         String[] transformations;
126
127         // Incoming value transformations
128         transformations = channelConfig.transformationPattern.split("∩");
129         Stream.of(transformations).filter(StringUtils::isNotBlank)
130                 .map(t -> new ChannelStateTransformation(t, transformationServiceProvider))
131                 .forEach(t -> state.addTransformation(t));
132
133         // Outgoing value transformations
134         transformations = channelConfig.transformationPatternOut.split("∩");
135         Stream.of(transformations).filter(StringUtils::isNotBlank)
136                 .map(t -> new ChannelStateTransformation(t, transformationServiceProvider))
137                 .forEach(t -> state.addTransformationOut(t));
138
139         return state;
140     }
141
142     @Override
143     public void initialize() {
144         GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);
145
146         String availabilityTopic = config.availabilityTopic;
147
148         if (availabilityTopic != null) {
149             addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable);
150         } else {
151             clearAllAvailabilityTopics();
152         }
153
154         List<ChannelUID> configErrors = new ArrayList<>();
155         for (Channel channel : thing.getChannels()) {
156             final ChannelTypeUID channelTypeUID = channel.getChannelTypeUID();
157             if (channelTypeUID == null) {
158                 logger.warn("Channel {} has no type", channel.getLabel());
159                 continue;
160             }
161             final ChannelConfig channelConfig = channel.getConfiguration().as(ChannelConfig.class);
162             try {
163                 Value value = ValueFactory.createValueState(channelConfig, channelTypeUID.getId());
164                 ChannelState channelState = createChannelState(channelConfig, channel.getUID(), value);
165                 channelStateByChannelUID.put(channel.getUID(), channelState);
166                 StateDescription description = value
167                         .createStateDescription(StringUtils.isBlank(channelConfig.commandTopic)).build()
168                         .toStateDescription();
169                 if (description != null) {
170                     stateDescProvider.setDescription(channel.getUID(), description);
171                 }
172             } catch (IllegalArgumentException e) {
173                 logger.warn("Channel configuration error", e);
174                 configErrors.add(channel.getUID());
175             }
176         }
177
178         // If some channels could not start up, put the entire thing offline and display the channels
179         // in question to the user.
180         if (!configErrors.isEmpty()) {
181             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Remove and recreate: "
182                     + configErrors.stream().map(ChannelUID::getAsString).collect(Collectors.joining(",")));
183             return;
184         }
185         super.initialize();
186     }
187
188     @Override
189     protected void updateThingStatus(boolean messageReceived, boolean availibilityTopicsSeen) {
190         if (messageReceived || availibilityTopicsSeen) {
191             updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
192         } else {
193             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
194         }
195     }
196 }