2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.homeassistant.internal.handler;
16 import java.util.Comparator;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.List;
21 import java.util.Objects;
22 import java.util.Optional;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.function.Consumer;
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
30 import org.openhab.binding.mqtt.generic.ChannelState;
31 import org.openhab.binding.mqtt.generic.MqttChannelStateDescriptionProvider;
32 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
33 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
34 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
35 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
36 import org.openhab.binding.mqtt.homeassistant.generic.internal.MqttBindingConstants;
37 import org.openhab.binding.mqtt.homeassistant.internal.ComponentChannel;
38 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents;
39 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents.ComponentDiscovered;
40 import org.openhab.binding.mqtt.homeassistant.internal.HaID;
41 import org.openhab.binding.mqtt.homeassistant.internal.HandlerConfiguration;
42 import org.openhab.binding.mqtt.homeassistant.internal.component.AbstractComponent;
43 import org.openhab.binding.mqtt.homeassistant.internal.component.ComponentFactory;
44 import org.openhab.binding.mqtt.homeassistant.internal.component.Update;
45 import org.openhab.binding.mqtt.homeassistant.internal.config.ChannelConfigurationTypeAdapterFactory;
46 import org.openhab.binding.mqtt.homeassistant.internal.exception.ConfigurationException;
47 import org.openhab.core.config.core.validation.ConfigValidationException;
48 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
49 import org.openhab.core.thing.Channel;
50 import org.openhab.core.thing.ChannelUID;
51 import org.openhab.core.thing.Thing;
52 import org.openhab.core.thing.ThingStatus;
53 import org.openhab.core.thing.ThingStatusDetail;
54 import org.openhab.core.thing.ThingTypeUID;
55 import org.openhab.core.thing.ThingUID;
56 import org.openhab.core.thing.binding.builder.ThingBuilder;
57 import org.openhab.core.thing.type.ChannelTypeRegistry;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 import com.google.gson.Gson;
62 import com.google.gson.GsonBuilder;
65 * Handles HomeAssistant MQTT object things. Such an HA Object can have multiple HA Components with different instances
66 * of those Components. This handler auto-discovers all available Components and Component Instances and
67 * adds any new appearing components over time.<br>
70 * The specification does not cover the case of disappearing Components. This handler doesn't as well therefore.<br>
73 * A Component Instance equals a Channel Group and the Component parts equal Channels.<br>
76 * If a Components configuration changes, the known ChannelGroupType and ChannelTypes are replaced with the new ones.
78 * @author David Graeff - Initial contribution
81 public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
82 implements ComponentDiscovered, Consumer<List<AbstractComponent<?>>> {
83 public static final String AVAILABILITY_CHANNEL = "availability";
84 private static final Comparator<AbstractComponent<?>> COMPONENT_COMPARATOR = Comparator
85 .comparing((AbstractComponent<?> component) -> component.hasGroup())
86 .thenComparing(AbstractComponent::getName);
87 private static final URI UPDATABLE_CONFIG_DESCRIPTION_URI = URI.create("thing-type:mqtt:homeassistant-updatable");
89 private final Logger logger = LoggerFactory.getLogger(HomeAssistantThingHandler.class);
91 protected final MqttChannelTypeProvider channelTypeProvider;
92 protected final MqttChannelStateDescriptionProvider stateDescriptionProvider;
93 protected final ChannelTypeRegistry channelTypeRegistry;
94 public final int attributeReceiveTimeout;
95 protected final DelayedBatchProcessing<AbstractComponent<?>> delayedProcessing;
96 protected final DiscoverComponents discoverComponents;
98 private final Gson gson;
99 protected final Map<@Nullable String, AbstractComponent<?>> haComponents = new HashMap<>();
101 protected HandlerConfiguration config = new HandlerConfiguration();
102 private Set<HaID> discoveryHomeAssistantIDs = new HashSet<>();
104 protected final TransformationServiceProvider transformationServiceProvider;
106 private boolean started;
107 private boolean newStyleChannels;
108 private @Nullable Update updateComponent;
111 * Create a new thing handler for HomeAssistant MQTT components.
112 * A channel type provider and a topic value receive timeout must be provided.
114 * @param thing The thing of this handler
115 * @param channelTypeProvider A channel type provider
116 * @param subscribeTimeout Timeout for the entire tree parsing and subscription. In milliseconds.
117 * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
119 public HomeAssistantThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider,
120 MqttChannelStateDescriptionProvider stateDescriptionProvider, ChannelTypeRegistry channelTypeRegistry,
121 TransformationServiceProvider transformationServiceProvider, int subscribeTimeout,
122 int attributeReceiveTimeout) {
123 super(thing, subscribeTimeout);
124 this.gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory()).create();
125 this.channelTypeProvider = channelTypeProvider;
126 this.stateDescriptionProvider = stateDescriptionProvider;
127 this.channelTypeRegistry = channelTypeRegistry;
128 this.transformationServiceProvider = transformationServiceProvider;
129 this.attributeReceiveTimeout = attributeReceiveTimeout;
130 this.delayedProcessing = new DelayedBatchProcessing<>(attributeReceiveTimeout, this, scheduler);
132 newStyleChannels = "true".equals(thing.getProperties().get("newStyleChannels"));
134 this.discoverComponents = new DiscoverComponents(thing.getUID(), scheduler, this, this, gson,
135 this.transformationServiceProvider, newStyleChannels);
139 public void initialize() {
142 config = getConfigAs(HandlerConfiguration.class);
143 if (config.topics.isEmpty()) {
144 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Device topics unknown");
147 discoveryHomeAssistantIDs.addAll(HaID.fromConfig(config));
149 ThingTypeUID typeID = getThing().getThingTypeUID();
150 for (Channel channel : thing.getChannels()) {
151 final String groupID = channel.getUID().getGroupId();
152 // Already restored component?
154 AbstractComponent<?> component = haComponents.get(groupID);
156 HaID haID = HaID.fromConfig(config.basetopic, channel.getConfiguration());
157 discoveryHomeAssistantIDs.add(haID);
158 ThingUID thingUID = channel.getUID().getThingUID();
159 String channelConfigurationJSON = (String) channel.getConfiguration().get("config");
160 if (channelConfigurationJSON == null) {
161 logger.warn("Provided channel does not have a 'config' configuration key!");
164 component = ComponentFactory.createComponent(thingUID, haID, channelConfigurationJSON, this, this,
165 scheduler, gson, transformationServiceProvider, newStyleChannels);
166 if (typeID.equals(MqttBindingConstants.HOMEASSISTANT_MQTT_THING)) {
167 typeID = calculateThingTypeUID(component);
170 haComponents.put(component.getGroupId(), component);
171 } catch (ConfigurationException e) {
172 logger.error("Cannot restore component {}: {}", thing, e.getMessage());
176 if (updateThingType(typeID)) {
182 public void dispose() {
183 removeStateDescriptions();
184 // super.dispose() calls stop()
189 public CompletableFuture<Void> unsubscribeAll() {
190 // already unsubscribed everything by calling stop()
191 return CompletableFuture.allOf();
195 * Start a background discovery for the configured HA MQTT object-id.
198 protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
201 connection.setQos(1);
202 updateStatus(ThingStatus.UNKNOWN);
204 // Start all known components and channels within the components and put the Thing offline
205 // if any subscribing failed ( == broker connection lost)
206 CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection),
207 haComponents.values().stream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
208 .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to
210 .exceptionally(e -> {
211 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
216 .thenCompose(b -> discoverComponents.startDiscovery(connection, 0, discoveryHomeAssistantIDs, this));
220 protected void stop() {
222 discoverComponents.stopDiscovery();
223 delayedProcessing.join();
224 // haComponents does not need to be synchronised -> the discovery thread is disabled
225 haComponents.values().stream().map(AbstractComponent::stop) //
226 // we need to join all the stops, otherwise they might not be done when start is called
227 .collect(FutureCollector.allOf()).join();
235 public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
237 if (channelUID.isInGroup()) {
238 componentId = channelUID.getGroupId();
240 componentId = channelUID.getId();
242 AbstractComponent<?> component;
243 synchronized (haComponents) { // sync whenever discoverComponents is started
244 component = haComponents.get(componentId);
246 if (component == null) {
247 component = haComponents.get("");
248 if (component == null) {
252 ComponentChannel componentChannel = component.getChannel(channelUID.getIdWithoutGroup());
253 if (componentChannel == null) {
256 return componentChannel.getState();
260 * Callback of {@link DiscoverComponents}. Add to a delayed batch processor.
263 public void componentDiscovered(HaID homeAssistantTopicID, AbstractComponent<?> component) {
264 delayedProcessing.accept(component);
268 * Callback of {@link DelayedBatchProcessing}.
269 * Add all newly discovered components to the Thing and start the components.
272 public void accept(List<AbstractComponent<?>> discoveredComponentsList) {
273 MqttBrokerConnection connection = this.connection;
274 if (connection == null) {
278 synchronized (haComponents) { // sync whenever discoverComponents is started
279 ThingTypeUID typeID = getThing().getThingTypeUID();
280 for (AbstractComponent<?> discovered : discoveredComponentsList) {
281 if (typeID.equals(MqttBindingConstants.HOMEASSISTANT_MQTT_THING)) {
282 typeID = calculateThingTypeUID(discovered);
284 String id = discovered.getGroupId();
285 AbstractComponent<?> known = haComponents.get(id);
286 // Is component already known?
288 if (discovered.getConfigHash() != known.getConfigHash()) {
289 // Don't wait for the future to complete. We are also not interested in failures.
290 // The component will be replaced in a moment.
293 known.setConfigSeen();
298 // Add component to the component map
299 haComponents.put(id, discovered);
300 // Start component / Subscribe to channel topics
301 discovered.start(connection, scheduler, 0).exceptionally(e -> {
302 logger.warn("Failed to start component {}", discovered.getHaID(), e);
306 if (discovered instanceof Update) {
307 updateComponent = (Update) discovered;
308 updateComponent.setReleaseStateUpdateListener(this::releaseStateUpdated);
311 updateThingType(typeID);
316 protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {
317 if (availabilityTopicsSeen.orElse(messageReceived)) {
318 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
320 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
325 public void handleConfigurationUpdate(Map<String, Object> configurationParameters)
326 throws ConfigValidationException {
327 if (configurationParameters.containsKey("doUpdate")) {
328 configurationParameters = new HashMap<>(configurationParameters);
329 Object value = configurationParameters.remove("doUpdate");
330 if (value instanceof Boolean doUpdate && doUpdate) {
331 Update updateComponent = this.updateComponent;
332 if (updateComponent == null) {
334 "Received update command for Home Assistant device {}, but it does not have an update component.",
335 getThing().getUID());
337 updateComponent.doUpdate();
341 super.handleConfigurationUpdate(configurationParameters);
344 private boolean updateThingType(ThingTypeUID typeID) {
345 // if this is a dynamic type, then we update the type
346 if (!MqttBindingConstants.HOMEASSISTANT_MQTT_THING.equals(typeID)) {
347 var thingTypeBuilder = channelTypeProvider.derive(typeID, MqttBindingConstants.HOMEASSISTANT_MQTT_THING);
349 if (getThing().getThingTypeUID().equals(MqttBindingConstants.HOMEASSISTANT_MQTT_THING)) {
350 logger.debug("Migrating Home Assistant thing {} from generic type to dynamic type {}",
351 getThing().getUID(), typeID);
353 // just create an empty thing type for now; channel configurations won't follow over
354 // to the re-created Thing, so we need to re-discover them all anyway
355 channelTypeProvider.putThingType(thingTypeBuilder.build());
356 changeThingType(typeID, getConfig());
360 synchronized (haComponents) { // sync whenever discoverComponents is started
361 var sortedComponents = haComponents.values().stream().sorted(COMPONENT_COMPARATOR).toList();
363 var channelGroupTypes = sortedComponents.stream().map(c -> c.getChannelGroupType(typeID.getId()))
364 .filter(Objects::nonNull).map(Objects::requireNonNull).toList();
365 channelTypeProvider.updateChannelGroupTypesForPrefix(typeID.getId(), channelGroupTypes);
367 var groupDefs = sortedComponents.stream().map(c -> c.getGroupDefinition(typeID.getId()))
368 .filter(Objects::nonNull).map(Objects::requireNonNull).toList();
369 var channelDefs = sortedComponents.stream().map(AbstractComponent::getChannelDefinitions)
370 .flatMap(List::stream).toList();
371 thingTypeBuilder.withChannelDefinitions(channelDefs).withChannelGroupDefinitions(groupDefs);
372 Update updateComponent = this.updateComponent;
373 if (updateComponent != null && updateComponent.isUpdatable()) {
374 thingTypeBuilder.withConfigDescriptionURI(UPDATABLE_CONFIG_DESCRIPTION_URI);
377 channelTypeProvider.putThingType(thingTypeBuilder.build());
379 removeStateDescriptions();
380 sortedComponents.stream().forEach(c -> c.addStateDescriptions(stateDescriptionProvider));
382 ThingBuilder thingBuilder = editThing().withChannels();
384 sortedComponents.stream().map(AbstractComponent::getChannels).flatMap(List::stream)
385 .forEach(c -> thingBuilder.withChannel(c));
387 updateThing(thingBuilder.build());
393 private ThingTypeUID calculateThingTypeUID(AbstractComponent component) {
394 return new ThingTypeUID(MqttBindingConstants.BINDING_ID, MqttBindingConstants.HOMEASSISTANT_MQTT_THING.getId()
395 + "_" + component.getChannelConfiguration().getThingId(component.getHaID().objectID));
399 public void handleRemoval() {
400 synchronized (haComponents) {
401 channelTypeProvider.removeThingType(thing.getThingTypeUID());
402 channelTypeProvider.removeChannelGroupTypesForPrefix(thing.getThingTypeUID().getId());
403 removeStateDescriptions();
405 super.handleRemoval();
408 private void removeStateDescriptions() {
409 thing.getChannels().stream().forEach(c -> stateDescriptionProvider.remove(c.getUID()));
412 private void releaseStateUpdated(Update.ReleaseState state) {
413 Map<String, String> properties = editProperties();
414 properties = state.appendToProperties(properties);
415 updateProperties(properties);