]> git.basschouten.com Git - openhab-addons.git/blob
6e147f20a36fb90f267e7f92d133b0eed830a6c5
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homeassistant.internal.handler;
14
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Optional;
21 import java.util.Set;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.function.Consumer;
24 import java.util.stream.Collectors;
25
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
29 import org.openhab.binding.mqtt.generic.ChannelState;
30 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
31 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
32 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
33 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
34 import org.openhab.binding.mqtt.homeassistant.generic.internal.MqttBindingConstants;
35 import org.openhab.binding.mqtt.homeassistant.internal.ComponentChannel;
36 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents;
37 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents.ComponentDiscovered;
38 import org.openhab.binding.mqtt.homeassistant.internal.HaID;
39 import org.openhab.binding.mqtt.homeassistant.internal.HandlerConfiguration;
40 import org.openhab.binding.mqtt.homeassistant.internal.component.AbstractComponent;
41 import org.openhab.binding.mqtt.homeassistant.internal.component.ComponentFactory;
42 import org.openhab.binding.mqtt.homeassistant.internal.config.ChannelConfigurationTypeAdapterFactory;
43 import org.openhab.binding.mqtt.homeassistant.internal.exception.ConfigurationException;
44 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
45 import org.openhab.core.thing.Channel;
46 import org.openhab.core.thing.ChannelUID;
47 import org.openhab.core.thing.Thing;
48 import org.openhab.core.thing.ThingStatus;
49 import org.openhab.core.thing.ThingStatusDetail;
50 import org.openhab.core.thing.ThingTypeUID;
51 import org.openhab.core.thing.ThingUID;
52 import org.openhab.core.thing.type.ChannelDefinition;
53 import org.openhab.core.thing.type.ChannelGroupDefinition;
54 import org.openhab.core.thing.type.ChannelGroupType;
55 import org.openhab.core.thing.type.ThingType;
56 import org.openhab.core.thing.util.ThingHelper;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 import com.google.gson.Gson;
61 import com.google.gson.GsonBuilder;
62
63 /**
64  * Handles HomeAssistant MQTT object things. Such an HA Object can have multiple HA Components with different instances
65  * of those Components. This handler auto-discovers all available Components and Component Instances and
66  * adds any new appearing components over time.<br>
67  * <br>
68  *
69  * The specification does not cover the case of disappearing Components. This handler doesn't as well therefore.<br>
70  * <br>
71  *
72  * A Component Instance equals a Channel Group and the Component parts equal Channels.<br>
73  * <br>
74  *
75  * If a Components configuration changes, the known ChannelGroupType and ChannelTypes are replaced with the new ones.
76  *
77  * @author David Graeff - Initial contribution
78  */
79 @NonNullByDefault
80 public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
81         implements ComponentDiscovered, Consumer<List<AbstractComponent<?>>> {
82     public static final String AVAILABILITY_CHANNEL = "availability";
83
84     private final Logger logger = LoggerFactory.getLogger(HomeAssistantThingHandler.class);
85
86     protected final MqttChannelTypeProvider channelTypeProvider;
87     public final int attributeReceiveTimeout;
88     protected final DelayedBatchProcessing<AbstractComponent<?>> delayedProcessing;
89     protected final DiscoverComponents discoverComponents;
90
91     private final Gson gson;
92     protected final Map<String, AbstractComponent<?>> haComponents = new HashMap<>();
93
94     protected HandlerConfiguration config = new HandlerConfiguration();
95     private Set<HaID> discoveryHomeAssistantIDs = new HashSet<>();
96
97     protected final TransformationServiceProvider transformationServiceProvider;
98
99     private boolean started;
100
101     /**
102      * Create a new thing handler for HomeAssistant MQTT components.
103      * A channel type provider and a topic value receive timeout must be provided.
104      *
105      * @param thing The thing of this handler
106      * @param channelTypeProvider A channel type provider
107      * @param subscribeTimeout Timeout for the entire tree parsing and subscription. In milliseconds.
108      * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
109      */
110     public HomeAssistantThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider,
111             TransformationServiceProvider transformationServiceProvider, int subscribeTimeout,
112             int attributeReceiveTimeout) {
113         super(thing, subscribeTimeout);
114         this.gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory()).create();
115         this.channelTypeProvider = channelTypeProvider;
116         this.transformationServiceProvider = transformationServiceProvider;
117         this.attributeReceiveTimeout = attributeReceiveTimeout;
118         this.delayedProcessing = new DelayedBatchProcessing<>(attributeReceiveTimeout, this, scheduler);
119         this.discoverComponents = new DiscoverComponents(thing.getUID(), scheduler, this, this, gson,
120                 this.transformationServiceProvider);
121     }
122
123     @SuppressWarnings({ "null", "unused" })
124     @Override
125     public void initialize() {
126         started = false;
127
128         config = getConfigAs(HandlerConfiguration.class);
129         if (config.topics == null || config.topics.isEmpty()) {
130             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Device topics unknown");
131             return;
132         }
133         discoveryHomeAssistantIDs.addAll(HaID.fromConfig(config));
134
135         for (Channel channel : thing.getChannels()) {
136             final String groupID = channel.getUID().getGroupId();
137             if (groupID == null) {
138                 logger.warn("Channel {} has no groupd ID", channel.getLabel());
139                 continue;
140             }
141             // Already restored component?
142             @Nullable
143             AbstractComponent<?> component = haComponents.get(groupID);
144             if (component != null) {
145                 // the types may have been removed in dispose() so we need to add them again
146                 component.addChannelTypes(channelTypeProvider);
147                 continue;
148             }
149
150             HaID haID = HaID.fromConfig(config.basetopic, channel.getConfiguration());
151             discoveryHomeAssistantIDs.add(haID);
152             ThingUID thingUID = channel.getUID().getThingUID();
153             String channelConfigurationJSON = (String) channel.getConfiguration().get("config");
154             if (channelConfigurationJSON == null) {
155                 logger.warn("Provided channel does not have a 'config' configuration key!");
156             } else {
157                 try {
158                     component = ComponentFactory.createComponent(thingUID, haID, channelConfigurationJSON, this, this,
159                             scheduler, gson, transformationServiceProvider);
160                     haComponents.put(component.getGroupUID().getId(), component);
161                     component.addChannelTypes(channelTypeProvider);
162                 } catch (ConfigurationException e) {
163                     logger.error("Cannot not restore component {}: {}", thing, e.getMessage());
164                 }
165             }
166         }
167         updateThingType();
168
169         super.initialize();
170     }
171
172     @Override
173     public void dispose() {
174         // super.dispose() calls stop()
175         super.dispose();
176         haComponents.values().forEach(c -> c.removeChannelTypes(channelTypeProvider));
177     }
178
179     @Override
180     public CompletableFuture<Void> unsubscribeAll() {
181         // already unsubscribed everything by calling stop()
182         return CompletableFuture.allOf();
183     }
184
185     /**
186      * Start a background discovery for the configured HA MQTT object-id.
187      */
188     @Override
189     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
190         started = true;
191
192         connection.setQos(1);
193         updateStatus(ThingStatus.UNKNOWN);
194
195         // Start all known components and channels within the components and put the Thing offline
196         // if any subscribing failed ( == broker connection lost)
197         CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection),
198                 haComponents.values().parallelStream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
199                         .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to
200                                                                                                           // one
201                         .exceptionally(e -> {
202                             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
203                             return null;
204                         }));
205
206         return future
207                 .thenCompose(b -> discoverComponents.startDiscovery(connection, 0, discoveryHomeAssistantIDs, this));
208     }
209
210     @Override
211     protected void stop() {
212         if (started) {
213             discoverComponents.stopDiscovery();
214             delayedProcessing.join();
215             // haComponents does not need to be synchronised -> the discovery thread is disabled
216             haComponents.values().parallelStream().map(AbstractComponent::stop) //
217                     // we need to join all the stops, otherwise they might not be done when start is called
218                     .collect(FutureCollector.allOf()).join();
219
220             started = false;
221         }
222         super.stop();
223     }
224
225     @SuppressWarnings({ "null", "unused" })
226     @Override
227     public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
228         String groupID = channelUID.getGroupId();
229         if (groupID == null) {
230             return null;
231         }
232         AbstractComponent<?> component;
233         synchronized (haComponents) { // sync whenever discoverComponents is started
234             component = haComponents.get(groupID);
235         }
236         if (component == null) {
237             return null;
238         }
239         ComponentChannel componentChannel = component.getChannel(channelUID.getIdWithoutGroup());
240         if (componentChannel == null) {
241             return null;
242         }
243         return componentChannel.getState();
244     }
245
246     /**
247      * Callback of {@link DiscoverComponents}. Add to a delayed batch processor.
248      */
249     @Override
250     public void componentDiscovered(HaID homeAssistantTopicID, AbstractComponent<?> component) {
251         delayedProcessing.accept(component);
252     }
253
254     /**
255      * Callback of {@link DelayedBatchProcessing}.
256      * Add all newly discovered components to the Thing and start the components.
257      */
258     @SuppressWarnings("null")
259     @Override
260     public void accept(List<AbstractComponent<?>> discoveredComponentsList) {
261         MqttBrokerConnection connection = this.connection;
262         if (connection == null) {
263             return;
264         }
265
266         synchronized (haComponents) { // sync whenever discoverComponents is started
267             for (AbstractComponent<?> discovered : discoveredComponentsList) {
268                 AbstractComponent<?> known = haComponents.get(discovered.getGroupUID().getId());
269                 // Is component already known?
270                 if (known != null) {
271                     if (discovered.getConfigHash() != known.getConfigHash()) {
272                         // Don't wait for the future to complete. We are also not interested in failures.
273                         // The component will be replaced in a moment.
274                         known.stop();
275                     } else {
276                         known.setConfigSeen();
277                         continue;
278                     }
279                 }
280
281                 // Add channel and group types to the types registry
282                 discovered.addChannelTypes(channelTypeProvider);
283                 // Add component to the component map
284                 haComponents.put(discovered.getGroupUID().getId(), discovered);
285                 // Start component / Subscribe to channel topics
286                 discovered.start(connection, scheduler, 0).exceptionally(e -> {
287                     logger.warn("Failed to start component {}", discovered.getGroupUID(), e);
288                     return null;
289                 });
290
291                 Collection<Channel> channels = discovered.getChannelMap().values().stream()
292                         .map(ComponentChannel::getChannel).collect(Collectors.toList());
293                 ThingHelper.addChannelsToThing(thing, channels);
294             }
295         }
296
297         updateThingType();
298     }
299
300     @Override
301     protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {
302         if (availabilityTopicsSeen.orElse(messageReceived)) {
303             updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
304         } else {
305             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
306         }
307     }
308
309     private void updateThingType() {
310         // if this is a dynamic type, then we update the type
311         ThingTypeUID typeID = thing.getThingTypeUID();
312         if (!MqttBindingConstants.HOMEASSISTANT_MQTT_THING.equals(typeID)) {
313             List<ChannelGroupDefinition> groupDefs;
314             List<ChannelDefinition> channelDefs;
315             synchronized (haComponents) { // sync whenever discoverComponents is started
316                 groupDefs = haComponents.values().stream().map(AbstractComponent::getGroupDefinition)
317                         .collect(Collectors.toList());
318                 channelDefs = haComponents.values().stream().map(AbstractComponent::getType)
319                         .map(ChannelGroupType::getChannelDefinitions).flatMap(List::stream)
320                         .collect(Collectors.toList());
321             }
322             ThingType thingType = channelTypeProvider.derive(typeID, MqttBindingConstants.HOMEASSISTANT_MQTT_THING)
323                     .withChannelDefinitions(channelDefs).withChannelGroupDefinitions(groupDefs).build();
324
325             channelTypeProvider.setThingType(typeID, thingType);
326         }
327     }
328 }