]> git.basschouten.com Git - openhab-addons.git/blob
14612c5f67924c2d9efbc3bfa1aaa88f2f6127f5
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homeassistant.internal.handler;
14
15 import java.util.ArrayList;
16 import java.util.Comparator;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Optional;
22 import java.util.Set;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.function.Consumer;
25 import java.util.stream.Collectors;
26
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
30 import org.openhab.binding.mqtt.generic.ChannelState;
31 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
32 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
33 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
34 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
35 import org.openhab.binding.mqtt.homeassistant.generic.internal.MqttBindingConstants;
36 import org.openhab.binding.mqtt.homeassistant.internal.ComponentChannel;
37 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents;
38 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents.ComponentDiscovered;
39 import org.openhab.binding.mqtt.homeassistant.internal.HaID;
40 import org.openhab.binding.mqtt.homeassistant.internal.HandlerConfiguration;
41 import org.openhab.binding.mqtt.homeassistant.internal.component.AbstractComponent;
42 import org.openhab.binding.mqtt.homeassistant.internal.component.ComponentFactory;
43 import org.openhab.binding.mqtt.homeassistant.internal.config.ChannelConfigurationTypeAdapterFactory;
44 import org.openhab.binding.mqtt.homeassistant.internal.exception.ConfigurationException;
45 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
46 import org.openhab.core.thing.Channel;
47 import org.openhab.core.thing.ChannelUID;
48 import org.openhab.core.thing.Thing;
49 import org.openhab.core.thing.ThingStatus;
50 import org.openhab.core.thing.ThingStatusDetail;
51 import org.openhab.core.thing.ThingTypeUID;
52 import org.openhab.core.thing.ThingUID;
53 import org.openhab.core.thing.binding.builder.ThingBuilder;
54 import org.openhab.core.thing.type.ChannelDefinition;
55 import org.openhab.core.thing.type.ChannelGroupDefinition;
56 import org.openhab.core.thing.type.ChannelGroupType;
57 import org.openhab.core.thing.type.ThingType;
58 import org.openhab.core.thing.util.ThingHelper;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 import com.google.gson.Gson;
63 import com.google.gson.GsonBuilder;
64
65 /**
66  * Handles HomeAssistant MQTT object things. Such an HA Object can have multiple HA Components with different instances
67  * of those Components. This handler auto-discovers all available Components and Component Instances and
68  * adds any new appearing components over time.<br>
69  * <br>
70  *
71  * The specification does not cover the case of disappearing Components. This handler doesn't as well therefore.<br>
72  * <br>
73  *
74  * A Component Instance equals a Channel Group and the Component parts equal Channels.<br>
75  * <br>
76  *
77  * If a Components configuration changes, the known ChannelGroupType and ChannelTypes are replaced with the new ones.
78  *
79  * @author David Graeff - Initial contribution
80  */
81 @NonNullByDefault
82 public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
83         implements ComponentDiscovered, Consumer<List<AbstractComponent<?>>> {
84     public static final String AVAILABILITY_CHANNEL = "availability";
85     private static final Comparator<Channel> CHANNEL_COMPARATOR_BY_UID = Comparator
86             .comparing(channel -> channel.getUID().toString());;
87
88     private final Logger logger = LoggerFactory.getLogger(HomeAssistantThingHandler.class);
89
90     protected final MqttChannelTypeProvider channelTypeProvider;
91     public final int attributeReceiveTimeout;
92     protected final DelayedBatchProcessing<AbstractComponent<?>> delayedProcessing;
93     protected final DiscoverComponents discoverComponents;
94
95     private final Gson gson;
96     protected final Map<String, AbstractComponent<?>> haComponents = new HashMap<>();
97
98     protected HandlerConfiguration config = new HandlerConfiguration();
99     private Set<HaID> discoveryHomeAssistantIDs = new HashSet<>();
100
101     protected final TransformationServiceProvider transformationServiceProvider;
102
103     private boolean started;
104
105     /**
106      * Create a new thing handler for HomeAssistant MQTT components.
107      * A channel type provider and a topic value receive timeout must be provided.
108      *
109      * @param thing The thing of this handler
110      * @param channelTypeProvider A channel type provider
111      * @param subscribeTimeout Timeout for the entire tree parsing and subscription. In milliseconds.
112      * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
113      */
114     public HomeAssistantThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider,
115             TransformationServiceProvider transformationServiceProvider, int subscribeTimeout,
116             int attributeReceiveTimeout) {
117         super(thing, subscribeTimeout);
118         this.gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory()).create();
119         this.channelTypeProvider = channelTypeProvider;
120         this.transformationServiceProvider = transformationServiceProvider;
121         this.attributeReceiveTimeout = attributeReceiveTimeout;
122         this.delayedProcessing = new DelayedBatchProcessing<>(attributeReceiveTimeout, this, scheduler);
123         this.discoverComponents = new DiscoverComponents(thing.getUID(), scheduler, this, this, gson,
124                 this.transformationServiceProvider);
125     }
126
127     @Override
128     public void initialize() {
129         started = false;
130
131         config = getConfigAs(HandlerConfiguration.class);
132         if (config.topics.isEmpty()) {
133             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Device topics unknown");
134             return;
135         }
136         discoveryHomeAssistantIDs.addAll(HaID.fromConfig(config));
137
138         for (Channel channel : thing.getChannels()) {
139             final String groupID = channel.getUID().getGroupId();
140             if (groupID == null) {
141                 logger.warn("Channel {} has no groupd ID", channel.getLabel());
142                 continue;
143             }
144             // Already restored component?
145             @Nullable
146             AbstractComponent<?> component = haComponents.get(groupID);
147             if (component != null) {
148                 // the types may have been removed in dispose() so we need to add them again
149                 component.addChannelTypes(channelTypeProvider);
150                 continue;
151             }
152
153             HaID haID = HaID.fromConfig(config.basetopic, channel.getConfiguration());
154             discoveryHomeAssistantIDs.add(haID);
155             ThingUID thingUID = channel.getUID().getThingUID();
156             String channelConfigurationJSON = (String) channel.getConfiguration().get("config");
157             if (channelConfigurationJSON == null) {
158                 logger.warn("Provided channel does not have a 'config' configuration key!");
159             } else {
160                 try {
161                     component = ComponentFactory.createComponent(thingUID, haID, channelConfigurationJSON, this, this,
162                             scheduler, gson, transformationServiceProvider);
163                     haComponents.put(component.getGroupUID().getId(), component);
164                     component.addChannelTypes(channelTypeProvider);
165                 } catch (ConfigurationException e) {
166                     logger.error("Cannot not restore component {}: {}", thing, e.getMessage());
167                 }
168             }
169         }
170         updateThingType();
171
172         super.initialize();
173     }
174
175     @Override
176     public void dispose() {
177         // super.dispose() calls stop()
178         super.dispose();
179         haComponents.values().forEach(c -> c.removeChannelTypes(channelTypeProvider));
180     }
181
182     @Override
183     public CompletableFuture<Void> unsubscribeAll() {
184         // already unsubscribed everything by calling stop()
185         return CompletableFuture.allOf();
186     }
187
188     /**
189      * Start a background discovery for the configured HA MQTT object-id.
190      */
191     @Override
192     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
193         started = true;
194
195         connection.setQos(1);
196         updateStatus(ThingStatus.UNKNOWN);
197
198         // Start all known components and channels within the components and put the Thing offline
199         // if any subscribing failed ( == broker connection lost)
200         CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection),
201                 haComponents.values().stream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
202                         .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to
203                                                                                                           // one
204                         .exceptionally(e -> {
205                             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
206                             return null;
207                         }));
208
209         return future
210                 .thenCompose(b -> discoverComponents.startDiscovery(connection, 0, discoveryHomeAssistantIDs, this));
211     }
212
213     @Override
214     protected void stop() {
215         if (started) {
216             discoverComponents.stopDiscovery();
217             delayedProcessing.join();
218             // haComponents does not need to be synchronised -> the discovery thread is disabled
219             haComponents.values().stream().map(AbstractComponent::stop) //
220                     // we need to join all the stops, otherwise they might not be done when start is called
221                     .collect(FutureCollector.allOf()).join();
222
223             started = false;
224         }
225         super.stop();
226     }
227
228     @Override
229     public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
230         String groupID = channelUID.getGroupId();
231         if (groupID == null) {
232             return null;
233         }
234         AbstractComponent<?> component;
235         synchronized (haComponents) { // sync whenever discoverComponents is started
236             component = haComponents.get(groupID);
237         }
238         if (component == null) {
239             return null;
240         }
241         ComponentChannel componentChannel = component.getChannel(channelUID.getIdWithoutGroup());
242         if (componentChannel == null) {
243             return null;
244         }
245         return componentChannel.getState();
246     }
247
248     /**
249      * Callback of {@link DiscoverComponents}. Add to a delayed batch processor.
250      */
251     @Override
252     public void componentDiscovered(HaID homeAssistantTopicID, AbstractComponent<?> component) {
253         delayedProcessing.accept(component);
254     }
255
256     /**
257      * Callback of {@link DelayedBatchProcessing}.
258      * Add all newly discovered components to the Thing and start the components.
259      */
260     @Override
261     public void accept(List<AbstractComponent<?>> discoveredComponentsList) {
262         MqttBrokerConnection connection = this.connection;
263         if (connection == null) {
264             return;
265         }
266
267         synchronized (haComponents) { // sync whenever discoverComponents is started
268             for (AbstractComponent<?> discovered : discoveredComponentsList) {
269                 AbstractComponent<?> known = haComponents.get(discovered.getGroupUID().getId());
270                 // Is component already known?
271                 if (known != null) {
272                     if (discovered.getConfigHash() != known.getConfigHash()) {
273                         // Don't wait for the future to complete. We are also not interested in failures.
274                         // The component will be replaced in a moment.
275                         known.stop();
276                     } else {
277                         known.setConfigSeen();
278                         continue;
279                     }
280                 }
281
282                 // Add channel and group types to the types registry
283                 discovered.addChannelTypes(channelTypeProvider);
284                 // Add component to the component map
285                 haComponents.put(discovered.getGroupUID().getId(), discovered);
286                 // Start component / Subscribe to channel topics
287                 discovered.start(connection, scheduler, 0).exceptionally(e -> {
288                     logger.warn("Failed to start component {}", discovered.getGroupUID(), e);
289                     return null;
290                 });
291
292                 List<Channel> discoveredChannels = discovered.getChannelMap().values().stream()
293                         .map(ComponentChannel::getChannel).collect(Collectors.toList());
294                 if (known != null) {
295                     // We had previously known component with different config hash
296                     // We remove all conflicting old channels, they will be re-added below based on the new discovery
297                     logger.debug(
298                             "Received component {} with slightly different config. Making sure we re-create conflicting channels...",
299                             discovered.getGroupUID());
300                     removeJustRediscoveredChannels(discoveredChannels);
301                 }
302
303                 // Add newly discovered channels. We sort the channels
304                 // for (mostly) consistent jsondb serialization
305                 discoveredChannels.sort(CHANNEL_COMPARATOR_BY_UID);
306                 ThingHelper.addChannelsToThing(thing, discoveredChannels);
307             }
308             updateThingType();
309         }
310     }
311
312     private void removeJustRediscoveredChannels(List<Channel> discoveredChannels) {
313         ArrayList<Channel> mutableChannels = new ArrayList<>(getThing().getChannels());
314         Set<ChannelUID> newChannelUIDs = discoveredChannels.stream().map(Channel::getUID).collect(Collectors.toSet());
315         // Take current channels but remove those channels that were just re-discovered
316         List<Channel> existingChannelsWithNewlyDiscoveredChannelsRemoved = mutableChannels.stream()
317                 .filter(existingChannel -> !newChannelUIDs.contains(existingChannel.getUID()))
318                 .collect(Collectors.toList());
319         if (existingChannelsWithNewlyDiscoveredChannelsRemoved.size() < mutableChannels.size()) {
320             // We sort the channels for (mostly) consistent jsondb serialization
321             existingChannelsWithNewlyDiscoveredChannelsRemoved.sort(CHANNEL_COMPARATOR_BY_UID);
322             updateThingChannels(existingChannelsWithNewlyDiscoveredChannelsRemoved);
323         }
324     }
325
326     private void updateThingChannels(List<Channel> channelList) {
327         ThingBuilder thingBuilder = editThing();
328         thingBuilder.withChannels(channelList);
329         updateThing(thingBuilder.build());
330     }
331
332     @Override
333     protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {
334         if (availabilityTopicsSeen.orElse(messageReceived)) {
335             updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
336         } else {
337             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
338         }
339     }
340
341     private void updateThingType() {
342         // if this is a dynamic type, then we update the type
343         ThingTypeUID typeID = thing.getThingTypeUID();
344         if (!MqttBindingConstants.HOMEASSISTANT_MQTT_THING.equals(typeID)) {
345             List<ChannelGroupDefinition> groupDefs;
346             List<ChannelDefinition> channelDefs;
347             synchronized (haComponents) { // sync whenever discoverComponents is started
348                 groupDefs = haComponents.values().stream().map(AbstractComponent::getGroupDefinition)
349                         .collect(Collectors.toList());
350                 channelDefs = haComponents.values().stream().map(AbstractComponent::getType)
351                         .map(ChannelGroupType::getChannelDefinitions).flatMap(List::stream)
352                         .collect(Collectors.toList());
353             }
354             ThingType thingType = channelTypeProvider.derive(typeID, MqttBindingConstants.HOMEASSISTANT_MQTT_THING)
355                     .withChannelDefinitions(channelDefs).withChannelGroupDefinitions(groupDefs).build();
356
357             channelTypeProvider.setThingType(typeID, thingType);
358         }
359     }
360 }