]> git.basschouten.com Git - openhab-addons.git/blob
d68ab44014c8576b98f29a72fcb5e61164821578
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homeassistant.internal.handler;
14
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Set;
21 import java.util.concurrent.CompletableFuture;
22 import java.util.function.Consumer;
23 import java.util.stream.Collectors;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
28 import org.openhab.binding.mqtt.generic.ChannelState;
29 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
30 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
31 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
32 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
33 import org.openhab.binding.mqtt.homeassistant.generic.internal.MqttBindingConstants;
34 import org.openhab.binding.mqtt.homeassistant.internal.AbstractComponent;
35 import org.openhab.binding.mqtt.homeassistant.internal.CChannel;
36 import org.openhab.binding.mqtt.homeassistant.internal.CFactory;
37 import org.openhab.binding.mqtt.homeassistant.internal.ChannelConfigurationTypeAdapterFactory;
38 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents;
39 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents.ComponentDiscovered;
40 import org.openhab.binding.mqtt.homeassistant.internal.HaID;
41 import org.openhab.binding.mqtt.homeassistant.internal.HandlerConfiguration;
42 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
43 import org.openhab.core.thing.Channel;
44 import org.openhab.core.thing.ChannelUID;
45 import org.openhab.core.thing.Thing;
46 import org.openhab.core.thing.ThingStatus;
47 import org.openhab.core.thing.ThingStatusDetail;
48 import org.openhab.core.thing.ThingTypeUID;
49 import org.openhab.core.thing.ThingUID;
50 import org.openhab.core.thing.type.ChannelDefinition;
51 import org.openhab.core.thing.type.ChannelGroupDefinition;
52 import org.openhab.core.thing.type.ChannelGroupType;
53 import org.openhab.core.thing.type.ThingType;
54 import org.openhab.core.thing.util.ThingHelper;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import com.google.gson.Gson;
59 import com.google.gson.GsonBuilder;
60
61 /**
62  * Handles HomeAssistant MQTT object things. Such an HA Object can have multiple HA Components with different instances
63  * of those Components. This handler auto-discovers all available Components and Component Instances and
64  * adds any new appearing components over time.<br>
65  * <br>
66  *
67  * The specification does not cover the case of disappearing Components. This handler doesn't as well therefore.<br>
68  * <br>
69  *
70  * A Component Instance equals an ESH Channel Group and the Component parts equal ESH Channels.<br>
71  * <br>
72  *
73  * If a Components configuration changes, the known ChannelGroupType and ChannelTypes are replaced with the new ones.
74  *
75  * @author David Graeff - Initial contribution
76  */
77 @NonNullByDefault
78 public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
79         implements ComponentDiscovered, Consumer<List<AbstractComponent<?>>> {
80     public static final String AVAILABILITY_CHANNEL = "availability";
81
82     private final Logger logger = LoggerFactory.getLogger(HomeAssistantThingHandler.class);
83
84     protected final MqttChannelTypeProvider channelTypeProvider;
85     public final int attributeReceiveTimeout;
86     protected final DelayedBatchProcessing<AbstractComponent<?>> delayedProcessing;
87     protected final DiscoverComponents discoverComponents;
88
89     private final Gson gson;
90     protected final Map<String, AbstractComponent<?>> haComponents = new HashMap<>();
91
92     protected HandlerConfiguration config = new HandlerConfiguration();
93     private Set<HaID> discoveryHomeAssistantIDs = new HashSet<>();
94
95     protected final TransformationServiceProvider transformationServiceProvider;
96
97     private boolean started;
98
99     /**
100      * Create a new thing handler for HomeAssistant MQTT components.
101      * A channel type provider and a topic value receive timeout must be provided.
102      *
103      * @param thing The thing of this handler
104      * @param channelTypeProvider A channel type provider
105      * @param subscribeTimeout Timeout for the entire tree parsing and subscription. In milliseconds.
106      * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
107      */
108     public HomeAssistantThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider,
109             TransformationServiceProvider transformationServiceProvider, int subscribeTimeout,
110             int attributeReceiveTimeout) {
111         super(thing, subscribeTimeout);
112         this.gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory()).create();
113         this.channelTypeProvider = channelTypeProvider;
114         this.transformationServiceProvider = transformationServiceProvider;
115         this.attributeReceiveTimeout = attributeReceiveTimeout;
116         this.delayedProcessing = new DelayedBatchProcessing<>(attributeReceiveTimeout, this, scheduler);
117         this.discoverComponents = new DiscoverComponents(thing.getUID(), scheduler, this, this, gson,
118                 this.transformationServiceProvider);
119     }
120
121     @SuppressWarnings({ "null", "unused" })
122     @Override
123     public void initialize() {
124         started = false;
125
126         config = getConfigAs(HandlerConfiguration.class);
127         if (config.topics == null || config.topics.isEmpty()) {
128             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Device topics unknown");
129             return;
130         }
131         discoveryHomeAssistantIDs.addAll(HaID.fromConfig(config));
132
133         for (Channel channel : thing.getChannels()) {
134             final String groupID = channel.getUID().getGroupId();
135             if (groupID == null) {
136                 logger.warn("Channel {} has no groupd ID", channel.getLabel());
137                 continue;
138             }
139             // Already restored component?
140             @Nullable
141             AbstractComponent<?> component = haComponents.get(groupID);
142             if (component != null) {
143                 // the types may have been removed in dispose() so we need to add them again
144                 component.addChannelTypes(channelTypeProvider);
145                 continue;
146             }
147
148             HaID haID = HaID.fromConfig(config.basetopic, channel.getConfiguration());
149             discoveryHomeAssistantIDs.add(haID);
150             ThingUID thingUID = channel.getUID().getThingUID();
151             String channelConfigurationJSON = (String) channel.getConfiguration().get("config");
152             if (channelConfigurationJSON == null) {
153                 logger.warn("Provided channel does not have a 'config' configuration key!");
154             } else {
155                 component = CFactory.createComponent(thingUID, haID, channelConfigurationJSON, this, this, gson,
156                         transformationServiceProvider);
157             }
158
159             if (component != null) {
160                 haComponents.put(component.uid().getId(), component);
161                 component.addChannelTypes(channelTypeProvider);
162             } else {
163                 logger.warn("Could not restore component {}", thing);
164             }
165         }
166         updateThingType();
167
168         super.initialize();
169     }
170
171     @Override
172     public void dispose() {
173         // super.dispose() calls stop()
174         super.dispose();
175         haComponents.values().forEach(c -> c.removeChannelTypes(channelTypeProvider));
176     }
177
178     @Override
179     public CompletableFuture<Void> unsubscribeAll() {
180         // already unsubscribed everything by calling stop()
181         return CompletableFuture.allOf();
182     }
183
184     /**
185      * Start a background discovery for the configured HA MQTT object-id.
186      */
187     @Override
188     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
189         started = true;
190
191         connection.setQos(1);
192         updateStatus(ThingStatus.UNKNOWN);
193
194         // Start all known components and channels within the components and put the Thing offline
195         // if any subscribing failed ( == broker connection lost)
196         CompletableFuture<@Nullable Void> future = haComponents.values().parallelStream()
197                 .map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
198                 .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to one
199                 .exceptionally(e -> {
200                     updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
201                     return null;
202                 });
203
204         return future
205                 .thenCompose(b -> discoverComponents.startDiscovery(connection, 0, discoveryHomeAssistantIDs, this));
206     }
207
208     @Override
209     protected void stop() {
210         if (started) {
211             discoverComponents.stopDiscovery();
212             delayedProcessing.join();
213             // haComponents does not need to be synchronised -> the discovery thread is disabled
214             haComponents.values().parallelStream().map(AbstractComponent::stop) //
215                     // we need to join all the stops, otherwise they might not be done when start is called
216                     .collect(FutureCollector.allOf()).join();
217
218             started = false;
219         }
220         super.stop();
221     }
222
223     @SuppressWarnings({ "null", "unused" })
224     @Override
225     public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
226         String groupID = channelUID.getGroupId();
227         if (groupID == null) {
228             return null;
229         }
230         AbstractComponent<?> component;
231         synchronized (haComponents) { // sync whenever discoverComponents is started
232             component = haComponents.get(groupID);
233         }
234         if (component == null) {
235             return null;
236         }
237         CChannel componentChannel = component.channel(channelUID.getIdWithoutGroup());
238         if (componentChannel == null) {
239             return null;
240         }
241         return componentChannel.getState();
242     }
243
244     /**
245      * Callback of {@link DiscoverComponents}. Add to a delayed batch processor.
246      */
247     @Override
248     public void componentDiscovered(HaID homeAssistantTopicID, AbstractComponent<?> component) {
249         delayedProcessing.accept(component);
250     }
251
252     /**
253      * Callback of {@link DelayedBatchProcessing}.
254      * Add all newly discovered components to the Thing and start the components.
255      */
256     @SuppressWarnings("null")
257     @Override
258     public void accept(List<AbstractComponent<?>> discoveredComponentsList) {
259         MqttBrokerConnection connection = this.connection;
260         if (connection == null) {
261             return;
262         }
263
264         synchronized (haComponents) { // sync whenever discoverComponents is started
265             for (AbstractComponent<?> discovered : discoveredComponentsList) {
266                 AbstractComponent<?> known = haComponents.get(discovered.uid().getId());
267                 // Is component already known?
268                 if (known != null) {
269                     if (discovered.getConfigHash() != known.getConfigHash()) {
270                         // Don't wait for the future to complete. We are also not interested in failures.
271                         // The component will be replaced in a moment.
272                         known.stop();
273                     } else {
274                         known.setConfigSeen();
275                         continue;
276                     }
277                 }
278
279                 // Add channel and group types to the types registry
280                 discovered.addChannelTypes(channelTypeProvider);
281                 // Add component to the component map
282                 haComponents.put(discovered.uid().getId(), discovered);
283                 // Start component / Subscribe to channel topics
284                 discovered.start(connection, scheduler, 0).exceptionally(e -> {
285                     logger.warn("Failed to start component {}", discovered.uid(), e);
286                     return null;
287                 });
288
289                 Collection<Channel> channels = discovered.channelTypes().values().stream().map(CChannel::getChannel)
290                         .collect(Collectors.toList());
291                 ThingHelper.addChannelsToThing(thing, channels);
292             }
293         }
294
295         updateThingType();
296     }
297
298     @Override
299     protected void updateThingStatus(boolean messageReceived, boolean availabilityTopicsSeen) {
300         if (!messageReceived || availabilityTopicsSeen) {
301             updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
302         } else {
303             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
304         }
305     }
306
307     private void updateThingType() {
308         // if this is a dynamic type, then we update the type
309         ThingTypeUID typeID = thing.getThingTypeUID();
310         if (!MqttBindingConstants.HOMEASSISTANT_MQTT_THING.equals(typeID)) {
311             List<ChannelGroupDefinition> groupDefs;
312             List<ChannelDefinition> channelDefs;
313             synchronized (haComponents) { // sync whenever discoverComponents is started
314                 groupDefs = haComponents.values().stream().map(AbstractComponent::getGroupDefinition)
315                         .collect(Collectors.toList());
316                 channelDefs = haComponents.values().stream().map(AbstractComponent::type)
317                         .map(ChannelGroupType::getChannelDefinitions).flatMap(List::stream)
318                         .collect(Collectors.toList());
319             }
320             ThingType thingType = channelTypeProvider.derive(typeID, MqttBindingConstants.HOMEASSISTANT_MQTT_THING)
321                     .withChannelDefinitions(channelDefs).withChannelGroupDefinitions(groupDefs).build();
322
323             channelTypeProvider.setThingType(typeID, thingType);
324         }
325     }
326 }