]> git.basschouten.com Git - openhab-addons.git/blob
99a14d4abd9c4fe3b55d73e121d152f1d1c72dd8
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homeassistant.internal.handler;
14
15 import java.util.ArrayList;
16 import java.util.Comparator;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.function.Consumer;
26 import java.util.stream.Collectors;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
31 import org.openhab.binding.mqtt.generic.ChannelState;
32 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
33 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
34 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
35 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
36 import org.openhab.binding.mqtt.homeassistant.generic.internal.MqttBindingConstants;
37 import org.openhab.binding.mqtt.homeassistant.internal.ComponentChannel;
38 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents;
39 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents.ComponentDiscovered;
40 import org.openhab.binding.mqtt.homeassistant.internal.HaID;
41 import org.openhab.binding.mqtt.homeassistant.internal.HandlerConfiguration;
42 import org.openhab.binding.mqtt.homeassistant.internal.component.AbstractComponent;
43 import org.openhab.binding.mqtt.homeassistant.internal.component.ComponentFactory;
44 import org.openhab.binding.mqtt.homeassistant.internal.config.ChannelConfigurationTypeAdapterFactory;
45 import org.openhab.binding.mqtt.homeassistant.internal.exception.ConfigurationException;
46 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
47 import org.openhab.core.thing.Channel;
48 import org.openhab.core.thing.ChannelGroupUID;
49 import org.openhab.core.thing.ChannelUID;
50 import org.openhab.core.thing.Thing;
51 import org.openhab.core.thing.ThingStatus;
52 import org.openhab.core.thing.ThingStatusDetail;
53 import org.openhab.core.thing.ThingTypeUID;
54 import org.openhab.core.thing.ThingUID;
55 import org.openhab.core.thing.binding.builder.ThingBuilder;
56 import org.openhab.core.thing.type.ChannelDefinition;
57 import org.openhab.core.thing.type.ChannelGroupDefinition;
58 import org.openhab.core.thing.type.ThingType;
59 import org.openhab.core.thing.util.ThingHelper;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 import com.google.gson.Gson;
64 import com.google.gson.GsonBuilder;
65
66 /**
67  * Handles HomeAssistant MQTT object things. Such an HA Object can have multiple HA Components with different instances
68  * of those Components. This handler auto-discovers all available Components and Component Instances and
69  * adds any new appearing components over time.<br>
70  * <br>
71  *
72  * The specification does not cover the case of disappearing Components. This handler doesn't as well therefore.<br>
73  * <br>
74  *
75  * A Component Instance equals a Channel Group and the Component parts equal Channels.<br>
76  * <br>
77  *
78  * If a Components configuration changes, the known ChannelGroupType and ChannelTypes are replaced with the new ones.
79  *
80  * @author David Graeff - Initial contribution
81  */
82 @NonNullByDefault
83 public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
84         implements ComponentDiscovered, Consumer<List<AbstractComponent<?>>> {
85     public static final String AVAILABILITY_CHANNEL = "availability";
86     private static final Comparator<Channel> CHANNEL_COMPARATOR_BY_UID = Comparator
87             .comparing(channel -> channel.getUID().toString());;
88
89     private final Logger logger = LoggerFactory.getLogger(HomeAssistantThingHandler.class);
90
91     protected final MqttChannelTypeProvider channelTypeProvider;
92     public final int attributeReceiveTimeout;
93     protected final DelayedBatchProcessing<AbstractComponent<?>> delayedProcessing;
94     protected final DiscoverComponents discoverComponents;
95
96     private final Gson gson;
97     protected final Map<@Nullable String, AbstractComponent<?>> haComponents = new HashMap<>();
98
99     protected HandlerConfiguration config = new HandlerConfiguration();
100     private Set<HaID> discoveryHomeAssistantIDs = new HashSet<>();
101
102     protected final TransformationServiceProvider transformationServiceProvider;
103
104     private boolean started;
105
106     /**
107      * Create a new thing handler for HomeAssistant MQTT components.
108      * A channel type provider and a topic value receive timeout must be provided.
109      *
110      * @param thing The thing of this handler
111      * @param channelTypeProvider A channel type provider
112      * @param subscribeTimeout Timeout for the entire tree parsing and subscription. In milliseconds.
113      * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
114      */
115     public HomeAssistantThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider,
116             TransformationServiceProvider transformationServiceProvider, int subscribeTimeout,
117             int attributeReceiveTimeout) {
118         super(thing, subscribeTimeout);
119         this.gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory()).create();
120         this.channelTypeProvider = channelTypeProvider;
121         this.transformationServiceProvider = transformationServiceProvider;
122         this.attributeReceiveTimeout = attributeReceiveTimeout;
123         this.delayedProcessing = new DelayedBatchProcessing<>(attributeReceiveTimeout, this, scheduler);
124         this.discoverComponents = new DiscoverComponents(thing.getUID(), scheduler, this, this, gson,
125                 this.transformationServiceProvider);
126     }
127
128     @Override
129     public void initialize() {
130         started = false;
131
132         config = getConfigAs(HandlerConfiguration.class);
133         if (config.topics.isEmpty()) {
134             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Device topics unknown");
135             return;
136         }
137         discoveryHomeAssistantIDs.addAll(HaID.fromConfig(config));
138
139         for (Channel channel : thing.getChannels()) {
140             final String groupID = channel.getUID().getGroupId();
141             // Already restored component?
142             @Nullable
143             AbstractComponent<?> component = haComponents.get(groupID);
144             if (component != null) {
145                 // the types may have been removed in dispose() so we need to add them again
146                 component.addChannelTypes(channelTypeProvider);
147                 continue;
148             }
149
150             HaID haID = HaID.fromConfig(config.basetopic, channel.getConfiguration());
151             discoveryHomeAssistantIDs.add(haID);
152             ThingUID thingUID = channel.getUID().getThingUID();
153             String channelConfigurationJSON = (String) channel.getConfiguration().get("config");
154             if (channelConfigurationJSON == null) {
155                 logger.warn("Provided channel does not have a 'config' configuration key!");
156             } else {
157                 try {
158                     component = ComponentFactory.createComponent(thingUID, haID, channelConfigurationJSON, this, this,
159                             scheduler, gson, transformationServiceProvider);
160                     final ChannelGroupUID groupUID = component.getGroupUID();
161                     String id = null;
162                     if (groupUID != null) {
163                         id = groupUID.getId();
164                     }
165                     haComponents.put(id, component);
166                     component.addChannelTypes(channelTypeProvider);
167                 } catch (ConfigurationException e) {
168                     logger.error("Cannot not restore component {}: {}", thing, e.getMessage());
169                 }
170             }
171         }
172         updateThingType();
173
174         super.initialize();
175     }
176
177     @Override
178     public void dispose() {
179         // super.dispose() calls stop()
180         super.dispose();
181         haComponents.values().forEach(c -> c.removeChannelTypes(channelTypeProvider));
182     }
183
184     @Override
185     public CompletableFuture<Void> unsubscribeAll() {
186         // already unsubscribed everything by calling stop()
187         return CompletableFuture.allOf();
188     }
189
190     /**
191      * Start a background discovery for the configured HA MQTT object-id.
192      */
193     @Override
194     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
195         started = true;
196
197         connection.setQos(1);
198         updateStatus(ThingStatus.UNKNOWN);
199
200         // Start all known components and channels within the components and put the Thing offline
201         // if any subscribing failed ( == broker connection lost)
202         CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection),
203                 haComponents.values().stream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
204                         .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to
205                                                                                                           // one
206                         .exceptionally(e -> {
207                             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
208                             return null;
209                         }));
210
211         return future
212                 .thenCompose(b -> discoverComponents.startDiscovery(connection, 0, discoveryHomeAssistantIDs, this));
213     }
214
215     @Override
216     protected void stop() {
217         if (started) {
218             discoverComponents.stopDiscovery();
219             delayedProcessing.join();
220             // haComponents does not need to be synchronised -> the discovery thread is disabled
221             haComponents.values().stream().map(AbstractComponent::stop) //
222                     // we need to join all the stops, otherwise they might not be done when start is called
223                     .collect(FutureCollector.allOf()).join();
224
225             started = false;
226         }
227         super.stop();
228     }
229
230     @Override
231     public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
232         String groupID = channelUID.getGroupId();
233         AbstractComponent<?> component;
234         synchronized (haComponents) { // sync whenever discoverComponents is started
235             component = haComponents.get(groupID);
236         }
237         if (component == null) {
238             return null;
239         }
240         ComponentChannel componentChannel = component.getChannel(channelUID.getIdWithoutGroup());
241         if (componentChannel == null) {
242             return null;
243         }
244         return componentChannel.getState();
245     }
246
247     /**
248      * Callback of {@link DiscoverComponents}. Add to a delayed batch processor.
249      */
250     @Override
251     public void componentDiscovered(HaID homeAssistantTopicID, AbstractComponent<?> component) {
252         delayedProcessing.accept(component);
253     }
254
255     /**
256      * Callback of {@link DelayedBatchProcessing}.
257      * Add all newly discovered components to the Thing and start the components.
258      */
259     @Override
260     public void accept(List<AbstractComponent<?>> discoveredComponentsList) {
261         MqttBrokerConnection connection = this.connection;
262         if (connection == null) {
263             return;
264         }
265
266         synchronized (haComponents) { // sync whenever discoverComponents is started
267             for (AbstractComponent<?> discovered : discoveredComponentsList) {
268                 final ChannelGroupUID groupUID = discovered.getGroupUID();
269                 String id = null;
270                 if (groupUID != null) {
271                     id = groupUID.getId();
272                 }
273                 AbstractComponent<?> known = haComponents.get(id);
274                 // Is component already known?
275                 if (known != null) {
276                     if (discovered.getConfigHash() != known.getConfigHash()) {
277                         // Don't wait for the future to complete. We are also not interested in failures.
278                         // The component will be replaced in a moment.
279                         known.stop();
280                     } else {
281                         known.setConfigSeen();
282                         continue;
283                     }
284                 }
285
286                 // Add channel and group types to the types registry
287                 discovered.addChannelTypes(channelTypeProvider);
288                 // Add component to the component map
289                 haComponents.put(id, discovered);
290                 // Start component / Subscribe to channel topics
291                 discovered.start(connection, scheduler, 0).exceptionally(e -> {
292                     logger.warn("Failed to start component {}", discovered.getHaID(), e);
293                     return null;
294                 });
295
296                 List<Channel> discoveredChannels = discovered.getChannelMap().values().stream()
297                         .map(ComponentChannel::getChannel).collect(Collectors.toList());
298                 if (known != null) {
299                     // We had previously known component with different config hash
300                     // We remove all conflicting old channels, they will be re-added below based on the new discovery
301                     logger.debug(
302                             "Received component {} with slightly different config. Making sure we re-create conflicting channels...",
303                             discovered.getHaID());
304                     removeJustRediscoveredChannels(discoveredChannels);
305                 }
306
307                 // Add newly discovered channels. We sort the channels
308                 // for (mostly) consistent jsondb serialization
309                 discoveredChannels.sort(CHANNEL_COMPARATOR_BY_UID);
310                 ThingHelper.addChannelsToThing(thing, discoveredChannels);
311             }
312             updateThingType();
313         }
314     }
315
316     private void removeJustRediscoveredChannels(List<Channel> discoveredChannels) {
317         ArrayList<Channel> mutableChannels = new ArrayList<>(getThing().getChannels());
318         Set<ChannelUID> newChannelUIDs = discoveredChannels.stream().map(Channel::getUID).collect(Collectors.toSet());
319         // Take current channels but remove those channels that were just re-discovered
320         List<Channel> existingChannelsWithNewlyDiscoveredChannelsRemoved = mutableChannels.stream()
321                 .filter(existingChannel -> !newChannelUIDs.contains(existingChannel.getUID()))
322                 .collect(Collectors.toList());
323         if (existingChannelsWithNewlyDiscoveredChannelsRemoved.size() < mutableChannels.size()) {
324             // We sort the channels for (mostly) consistent jsondb serialization
325             existingChannelsWithNewlyDiscoveredChannelsRemoved.sort(CHANNEL_COMPARATOR_BY_UID);
326             updateThingChannels(existingChannelsWithNewlyDiscoveredChannelsRemoved);
327         }
328     }
329
330     private void updateThingChannels(List<Channel> channelList) {
331         ThingBuilder thingBuilder = editThing();
332         thingBuilder.withChannels(channelList);
333         updateThing(thingBuilder.build());
334     }
335
336     @Override
337     protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {
338         if (availabilityTopicsSeen.orElse(messageReceived)) {
339             updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
340         } else {
341             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
342         }
343     }
344
345     private void updateThingType() {
346         // if this is a dynamic type, then we update the type
347         ThingTypeUID typeID = thing.getThingTypeUID();
348         if (!MqttBindingConstants.HOMEASSISTANT_MQTT_THING.equals(typeID)) {
349             List<ChannelGroupDefinition> groupDefs;
350             List<ChannelDefinition> channelDefs;
351             synchronized (haComponents) { // sync whenever discoverComponents is started
352                 groupDefs = haComponents.values().stream().map(AbstractComponent::getGroupDefinition)
353                         .filter(Objects::nonNull).map(Objects::requireNonNull).collect(Collectors.toList());
354                 channelDefs = haComponents.values().stream().map(AbstractComponent::getChannels).flatMap(List::stream)
355                         .collect(Collectors.toList());
356             }
357             ThingType thingType = channelTypeProvider.derive(typeID, MqttBindingConstants.HOMEASSISTANT_MQTT_THING)
358                     .withChannelDefinitions(channelDefs).withChannelGroupDefinitions(groupDefs).build();
359
360             channelTypeProvider.setThingType(typeID, thingType);
361         }
362     }
363 }