2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.homeassistant.internal.handler;
16 import java.util.ArrayList;
17 import java.util.Comparator;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.Iterator;
21 import java.util.List;
23 import java.util.Objects;
24 import java.util.Optional;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.function.Consumer;
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
32 import org.openhab.binding.mqtt.generic.ChannelState;
33 import org.openhab.binding.mqtt.generic.MqttChannelStateDescriptionProvider;
34 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
35 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
36 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
37 import org.openhab.binding.mqtt.homeassistant.generic.internal.MqttBindingConstants;
38 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents;
39 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents.ComponentDiscovered;
40 import org.openhab.binding.mqtt.homeassistant.internal.HaID;
41 import org.openhab.binding.mqtt.homeassistant.internal.HandlerConfiguration;
42 import org.openhab.binding.mqtt.homeassistant.internal.component.AbstractComponent;
43 import org.openhab.binding.mqtt.homeassistant.internal.component.ComponentFactory;
44 import org.openhab.binding.mqtt.homeassistant.internal.component.DeviceTrigger;
45 import org.openhab.binding.mqtt.homeassistant.internal.component.Update;
46 import org.openhab.binding.mqtt.homeassistant.internal.config.ChannelConfigurationTypeAdapterFactory;
47 import org.openhab.binding.mqtt.homeassistant.internal.exception.ConfigurationException;
48 import org.openhab.core.config.core.Configuration;
49 import org.openhab.core.config.core.validation.ConfigValidationException;
50 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
51 import org.openhab.core.thing.Channel;
52 import org.openhab.core.thing.ChannelUID;
53 import org.openhab.core.thing.Thing;
54 import org.openhab.core.thing.ThingStatus;
55 import org.openhab.core.thing.ThingStatusDetail;
56 import org.openhab.core.thing.ThingTypeUID;
57 import org.openhab.core.thing.ThingUID;
58 import org.openhab.core.thing.binding.builder.ThingBuilder;
59 import org.openhab.core.thing.type.ChannelTypeRegistry;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 import com.google.gson.Gson;
64 import com.google.gson.GsonBuilder;
65 import com.hubspot.jinjava.Jinjava;
68 * Handles HomeAssistant MQTT object things. Such an HA Object can have multiple HA Components with different instances
69 * of those Components. This handler auto-discovers all available Components and Component Instances and
70 * adds any new appearing components over time.<br>
73 * The specification does not cover the case of disappearing Components. This handler doesn't as well therefore.<br>
76 * A Component Instance equals a Channel Group and the Component parts equal Channels.<br>
79 * If a Components configuration changes, the known ChannelGroupType and ChannelTypes are replaced with the new ones.
81 * @author David Graeff - Initial contribution
84 public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
85 implements ComponentDiscovered, Consumer<List<AbstractComponent<?>>> {
86 public static final String AVAILABILITY_CHANNEL = "availability";
87 private static final Comparator<AbstractComponent<?>> COMPONENT_COMPARATOR = Comparator
88 .comparing((AbstractComponent<?> component) -> component.hasGroup())
89 .thenComparing(AbstractComponent::getName);
90 private static final URI UPDATABLE_CONFIG_DESCRIPTION_URI = URI.create("thing-type:mqtt:homeassistant-updatable");
92 private final Logger logger = LoggerFactory.getLogger(HomeAssistantThingHandler.class);
94 protected final MqttChannelTypeProvider channelTypeProvider;
95 protected final MqttChannelStateDescriptionProvider stateDescriptionProvider;
96 protected final ChannelTypeRegistry channelTypeRegistry;
97 protected final Jinjava jinjava;
98 public final int attributeReceiveTimeout;
99 protected final DelayedBatchProcessing<AbstractComponent<?>> delayedProcessing;
100 protected final DiscoverComponents discoverComponents;
102 private final Gson gson;
103 protected final Map<@Nullable String, AbstractComponent<?>> haComponents = new HashMap<>();
104 protected final Map<@Nullable String, AbstractComponent<?>> haComponentsByUniqueId = new HashMap<>();
105 protected final Map<ChannelUID, ChannelState> channelStates = new HashMap<>();
107 protected HandlerConfiguration config = new HandlerConfiguration();
108 private Set<HaID> discoveryHomeAssistantIDs = new HashSet<>();
110 private boolean started;
111 private boolean newStyleChannels;
112 private @Nullable Update updateComponent;
115 * Create a new thing handler for HomeAssistant MQTT components.
116 * A channel type provider and a topic value receive timeout must be provided.
118 * @param thing The thing of this handler
119 * @param channelTypeProvider A channel type provider
120 * @param subscribeTimeout Timeout for the entire tree parsing and subscription. In milliseconds.
121 * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
123 public HomeAssistantThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider,
124 MqttChannelStateDescriptionProvider stateDescriptionProvider, ChannelTypeRegistry channelTypeRegistry,
125 Jinjava jinjava, int subscribeTimeout, int attributeReceiveTimeout) {
126 super(thing, subscribeTimeout);
127 this.gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory()).create();
128 this.channelTypeProvider = channelTypeProvider;
129 this.stateDescriptionProvider = stateDescriptionProvider;
130 this.channelTypeRegistry = channelTypeRegistry;
131 this.jinjava = jinjava;
132 this.attributeReceiveTimeout = attributeReceiveTimeout;
133 this.delayedProcessing = new DelayedBatchProcessing<>(attributeReceiveTimeout, this, scheduler);
135 newStyleChannels = "true".equals(thing.getProperties().get("newStyleChannels"));
137 this.discoverComponents = new DiscoverComponents(thing.getUID(), scheduler, this, this, gson, jinjava,
142 public void initialize() {
145 config = getConfigAs(HandlerConfiguration.class);
146 if (config.topics.isEmpty()) {
147 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Device topics unknown");
150 discoveryHomeAssistantIDs.addAll(HaID.fromConfig(config));
152 ThingTypeUID typeID = getThing().getThingTypeUID();
153 for (Channel channel : thing.getChannels()) {
154 final String groupID = channel.getUID().getGroupId();
155 if (groupID != null) {
156 // Already restored component via another channel in the component?
157 AbstractComponent<?> component = haComponents.get(groupID);
158 if (component != null) {
162 Configuration multiComponentChannelConfig = channel.getConfiguration();
163 if (!multiComponentChannelConfig.containsKey("component")
164 || !multiComponentChannelConfig.containsKey("objectid")
165 || !multiComponentChannelConfig.containsKey("config")) {
166 // Must be a secondary channel
170 List<Configuration> flattenedConfig = flattenChannelConfiguration(multiComponentChannelConfig,
172 for (Configuration channelConfig : flattenedConfig) {
173 HaID haID = HaID.fromConfig(config.basetopic, channelConfig);
175 if (!config.topics.contains(haID.getTopic())) {
176 // don't add a component for this channel that isn't configured on the thing
177 // anymore. It will disappear from the thing when the thing type is updated below
181 discoveryHomeAssistantIDs.add(haID);
182 ThingUID thingUID = channel.getUID().getThingUID();
183 String channelConfigurationJSON = (String) channelConfig.get("config");
185 AbstractComponent<?> component = ComponentFactory.createComponent(thingUID, haID,
186 channelConfigurationJSON, this, this, scheduler, gson, jinjava, newStyleChannels);
187 if (typeID.equals(MqttBindingConstants.HOMEASSISTANT_MQTT_THING)) {
188 typeID = calculateThingTypeUID(component);
191 addComponent(component);
192 } catch (ConfigurationException e) {
193 logger.warn("Cannot restore component {}: {}", thing, e.getMessage());
197 if (updateThingType(typeID)) {
203 public void dispose() {
204 removeStateDescriptions();
205 // super.dispose() calls stop()
210 public CompletableFuture<Void> unsubscribeAll() {
211 // already unsubscribed everything by calling stop()
212 return CompletableFuture.allOf();
216 * Start a background discovery for the configured HA MQTT object-id.
219 protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
222 connection.setQos(1);
223 updateStatus(ThingStatus.UNKNOWN);
225 // Start all known components and channels within the components and put the Thing offline
226 // if any subscribing failed ( == broker connection lost)
227 CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection),
228 haComponents.values().stream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
229 .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to
231 .exceptionally(e -> {
232 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
237 .thenCompose(b -> discoverComponents.startDiscovery(connection, 0, discoveryHomeAssistantIDs, this));
241 protected void stop() {
243 discoverComponents.stopDiscovery();
244 delayedProcessing.join();
245 // haComponents does not need to be synchronised -> the discovery thread is disabled
246 haComponents.values().stream().map(AbstractComponent::stop) //
247 // we need to join all the stops, otherwise they might not be done when start is called
248 .collect(FutureCollector.allOf()).join();
256 public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
257 synchronized (haComponents) { // sync whenever discoverComponents is started
258 return channelStates.get(channelUID);
263 * Callback of {@link DiscoverComponents}. Add to a delayed batch processor.
266 public void componentDiscovered(HaID homeAssistantTopicID, AbstractComponent<?> component) {
267 delayedProcessing.accept(component);
271 * Callback of {@link DelayedBatchProcessing}.
272 * Add all newly discovered components to the Thing and start the components.
275 public void accept(List<AbstractComponent<?>> discoveredComponentsList) {
276 MqttBrokerConnection connection = this.connection;
277 if (connection == null) {
281 synchronized (haComponents) { // sync whenever discoverComponents is started
282 ThingTypeUID typeID = getThing().getThingTypeUID();
283 for (AbstractComponent<?> discovered : discoveredComponentsList) {
284 if (typeID.equals(MqttBindingConstants.HOMEASSISTANT_MQTT_THING)) {
285 typeID = calculateThingTypeUID(discovered);
287 AbstractComponent<?> known = haComponentsByUniqueId.get(discovered.getUniqueId());
288 // Is component already known?
290 if (discovered.getConfigHash() != known.getConfigHash()
291 && discovered.getUniqueId().equals(known.getUniqueId())) {
292 // Don't wait for the future to complete. We are also not interested in failures.
293 // The component will be replaced in a moment.
295 haComponentsByUniqueId.remove(discovered.getUniqueId());
296 haComponents.remove(known.getComponentId());
297 if (!known.getComponentId().equals(discovered.getComponentId())) {
298 discovered.resolveConflict();
301 known.setConfigSeen();
306 // Add component to the component map
307 addComponent(discovered);
308 // Start component / Subscribe to channel topics
309 discovered.start(connection, scheduler, 0).exceptionally(e -> {
310 logger.warn("Failed to start component {}", discovered.getHaID(), e);
314 if (discovered instanceof Update) {
315 updateComponent = (Update) discovered;
316 updateComponent.setReleaseStateUpdateListener(this::releaseStateUpdated);
319 updateThingType(typeID);
324 protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {
325 if (availabilityTopicsSeen.orElse(messageReceived)) {
326 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
328 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
333 public void handleConfigurationUpdate(Map<String, Object> configurationParameters)
334 throws ConfigValidationException {
335 if (configurationParameters.containsKey("doUpdate")) {
336 configurationParameters = new HashMap<>(configurationParameters);
337 Object value = configurationParameters.remove("doUpdate");
338 if (value instanceof Boolean doUpdate && doUpdate) {
339 Update updateComponent = this.updateComponent;
340 if (updateComponent == null) {
342 "Received update command for Home Assistant device {}, but it does not have an update component.",
343 getThing().getUID());
345 updateComponent.doUpdate();
349 super.handleConfigurationUpdate(configurationParameters);
352 private boolean updateThingType(ThingTypeUID typeID) {
353 // if this is a dynamic type, then we update the type
354 if (!MqttBindingConstants.HOMEASSISTANT_MQTT_THING.equals(typeID)) {
355 var thingTypeBuilder = channelTypeProvider.derive(typeID, MqttBindingConstants.HOMEASSISTANT_MQTT_THING);
357 if (getThing().getThingTypeUID().equals(MqttBindingConstants.HOMEASSISTANT_MQTT_THING)) {
358 logger.debug("Migrating Home Assistant thing {} from generic type to dynamic type {}",
359 getThing().getUID(), typeID);
361 // just create an empty thing type for now; channel configurations won't follow over
362 // to the re-created Thing, so we need to re-discover them all anyway
363 channelTypeProvider.putThingType(thingTypeBuilder.build());
364 changeThingType(typeID, getConfig());
368 synchronized (haComponents) { // sync whenever discoverComponents is started
369 var sortedComponents = haComponents.values().stream().sorted(COMPONENT_COMPARATOR).toList();
371 var channelGroupTypes = sortedComponents.stream().map(c -> c.getChannelGroupType(typeID.getId()))
372 .filter(Objects::nonNull).map(Objects::requireNonNull).toList();
373 channelTypeProvider.updateChannelGroupTypesForPrefix(typeID.getId(), channelGroupTypes);
375 var groupDefs = sortedComponents.stream().map(c -> c.getGroupDefinition(typeID.getId()))
376 .filter(Objects::nonNull).map(Objects::requireNonNull).toList();
377 var channelDefs = sortedComponents.stream().map(AbstractComponent::getChannelDefinitions)
378 .flatMap(List::stream).toList();
379 thingTypeBuilder.withChannelDefinitions(channelDefs).withChannelGroupDefinitions(groupDefs);
380 Update updateComponent = this.updateComponent;
381 if (updateComponent != null && updateComponent.isUpdatable()) {
382 thingTypeBuilder.withConfigDescriptionURI(UPDATABLE_CONFIG_DESCRIPTION_URI);
385 channelTypeProvider.putThingType(thingTypeBuilder.build());
387 removeStateDescriptions();
388 sortedComponents.stream().forEach(c -> c.addStateDescriptions(stateDescriptionProvider));
390 ThingBuilder thingBuilder = editThing().withChannels();
392 sortedComponents.stream().map(AbstractComponent::getChannels).flatMap(List::stream)
393 .forEach(c -> thingBuilder.withChannel(c));
395 channelStates.clear();
396 sortedComponents.forEach(c -> c.getChannelStates(channelStates));
398 updateThing(thingBuilder.build());
404 private ThingTypeUID calculateThingTypeUID(AbstractComponent component) {
405 return new ThingTypeUID(MqttBindingConstants.BINDING_ID, MqttBindingConstants.HOMEASSISTANT_MQTT_THING.getId()
406 + "_" + component.getChannelConfiguration().getThingId(component.getHaID().objectID));
410 public void handleRemoval() {
411 synchronized (haComponents) {
412 channelTypeProvider.removeThingType(thing.getThingTypeUID());
413 channelTypeProvider.removeChannelGroupTypesForPrefix(thing.getThingTypeUID().getId());
414 removeStateDescriptions();
416 super.handleRemoval();
419 private void removeStateDescriptions() {
420 thing.getChannels().stream().forEach(c -> stateDescriptionProvider.remove(c.getUID()));
423 private void releaseStateUpdated(Update.ReleaseState state) {
424 Map<String, String> properties = editProperties();
425 properties = state.appendToProperties(properties);
426 updateProperties(properties);
429 // should only be called when it's safe to access haComponents
430 private void addComponent(AbstractComponent component) {
431 AbstractComponent existing = haComponents.get(component.getComponentId());
432 if (existing != null) {
433 // DeviceTriggers that are for the same subtype, topic, and value template
434 // can be coalesced together
435 if (component instanceof DeviceTrigger newTrigger && existing instanceof DeviceTrigger oldTrigger
436 && newTrigger.getChannelConfiguration().getSubtype()
437 .equals(oldTrigger.getChannelConfiguration().getSubtype())
438 && newTrigger.getChannelConfiguration().getTopic()
439 .equals(oldTrigger.getChannelConfiguration().getTopic())
440 && oldTrigger.getHaID().nodeID.equals(newTrigger.getHaID().nodeID)) {
441 String newTriggerValueTemplate = newTrigger.getChannelConfiguration().getValueTemplate();
442 String oldTriggerValueTemplate = oldTrigger.getChannelConfiguration().getValueTemplate();
443 if ((newTriggerValueTemplate == null && oldTriggerValueTemplate == null)
444 || (newTriggerValueTemplate != null & oldTriggerValueTemplate != null
445 && newTriggerValueTemplate.equals(oldTriggerValueTemplate))) {
446 // Adjust the set of valid values
447 MqttBrokerConnection connection = this.connection;
449 if (oldTrigger.merge(newTrigger) && connection != null) {
450 // Make sure to re-start if this did something, and it was stopped
451 oldTrigger.start(connection, scheduler, 0).exceptionally(e -> {
452 logger.warn("Failed to start component {}", oldTrigger.getHaID(), e);
456 haComponentsByUniqueId.put(component.getUniqueId(), component);
457 System.out.println("don't forget to add to the channel config");
462 // rename the conflict
463 haComponents.remove(existing.getComponentId());
464 existing.resolveConflict();
465 component.resolveConflict();
466 haComponents.put(existing.getComponentId(), existing);
468 haComponents.put(component.getComponentId(), component);
469 haComponentsByUniqueId.put(component.getUniqueId(), component);
473 * Takes a Configuration where objectid and config are a list, and generates
474 * multiple Configurations where there are single objects
476 private List<Configuration> flattenChannelConfiguration(Configuration multiComponentChannelConfig,
477 ChannelUID channelUID) {
478 Object component = multiComponentChannelConfig.get("component");
479 Object nodeid = multiComponentChannelConfig.get("nodeid");
480 if ((multiComponentChannelConfig.get("objectid") instanceof List objectIds)
481 && (multiComponentChannelConfig.get("config") instanceof List configurations)) {
482 if (objectIds.size() != configurations.size()) {
483 logger.warn("objectid and config for channel {} do not have the same number of items; ignoring",
487 List<Configuration> result = new ArrayList();
488 Iterator<Object> objectIdIterator = objectIds.iterator();
489 Iterator<Object> configIterator = configurations.iterator();
490 while (objectIdIterator.hasNext()) {
491 Configuration componentConfiguration = new Configuration();
492 componentConfiguration.put("component", component);
493 componentConfiguration.put("nodeid", nodeid);
494 componentConfiguration.put("objectid", objectIdIterator.next());
495 componentConfiguration.put("config", configIterator.next());
496 result.add(componentConfiguration);
500 return List.of(multiComponentChannelConfig);
505 Map<@Nullable String, AbstractComponent<?>> getComponents() {