2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.homeassistant.internal.handler;
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
20 import java.util.Optional;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.function.Consumer;
24 import java.util.stream.Collectors;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
29 import org.openhab.binding.mqtt.generic.ChannelState;
30 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
31 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
32 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
33 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
34 import org.openhab.binding.mqtt.homeassistant.generic.internal.MqttBindingConstants;
35 import org.openhab.binding.mqtt.homeassistant.internal.ComponentChannel;
36 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents;
37 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents.ComponentDiscovered;
38 import org.openhab.binding.mqtt.homeassistant.internal.HaID;
39 import org.openhab.binding.mqtt.homeassistant.internal.HandlerConfiguration;
40 import org.openhab.binding.mqtt.homeassistant.internal.component.AbstractComponent;
41 import org.openhab.binding.mqtt.homeassistant.internal.component.ComponentFactory;
42 import org.openhab.binding.mqtt.homeassistant.internal.config.ChannelConfigurationTypeAdapterFactory;
43 import org.openhab.binding.mqtt.homeassistant.internal.exception.ConfigurationException;
44 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
45 import org.openhab.core.thing.Channel;
46 import org.openhab.core.thing.ChannelUID;
47 import org.openhab.core.thing.Thing;
48 import org.openhab.core.thing.ThingStatus;
49 import org.openhab.core.thing.ThingStatusDetail;
50 import org.openhab.core.thing.ThingTypeUID;
51 import org.openhab.core.thing.ThingUID;
52 import org.openhab.core.thing.type.ChannelDefinition;
53 import org.openhab.core.thing.type.ChannelGroupDefinition;
54 import org.openhab.core.thing.type.ChannelGroupType;
55 import org.openhab.core.thing.type.ThingType;
56 import org.openhab.core.thing.util.ThingHelper;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 import com.google.gson.Gson;
61 import com.google.gson.GsonBuilder;
64 * Handles HomeAssistant MQTT object things. Such an HA Object can have multiple HA Components with different instances
65 * of those Components. This handler auto-discovers all available Components and Component Instances and
66 * adds any new appearing components over time.<br>
69 * The specification does not cover the case of disappearing Components. This handler doesn't as well therefore.<br>
72 * A Component Instance equals a Channel Group and the Component parts equal Channels.<br>
75 * If a Components configuration changes, the known ChannelGroupType and ChannelTypes are replaced with the new ones.
77 * @author David Graeff - Initial contribution
80 public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
81 implements ComponentDiscovered, Consumer<List<AbstractComponent<?>>> {
82 public static final String AVAILABILITY_CHANNEL = "availability";
84 private final Logger logger = LoggerFactory.getLogger(HomeAssistantThingHandler.class);
86 protected final MqttChannelTypeProvider channelTypeProvider;
87 public final int attributeReceiveTimeout;
88 protected final DelayedBatchProcessing<AbstractComponent<?>> delayedProcessing;
89 protected final DiscoverComponents discoverComponents;
91 private final Gson gson;
92 protected final Map<String, AbstractComponent<?>> haComponents = new HashMap<>();
94 protected HandlerConfiguration config = new HandlerConfiguration();
95 private Set<HaID> discoveryHomeAssistantIDs = new HashSet<>();
97 protected final TransformationServiceProvider transformationServiceProvider;
99 private boolean started;
102 * Create a new thing handler for HomeAssistant MQTT components.
103 * A channel type provider and a topic value receive timeout must be provided.
105 * @param thing The thing of this handler
106 * @param channelTypeProvider A channel type provider
107 * @param subscribeTimeout Timeout for the entire tree parsing and subscription. In milliseconds.
108 * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
110 public HomeAssistantThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider,
111 TransformationServiceProvider transformationServiceProvider, int subscribeTimeout,
112 int attributeReceiveTimeout) {
113 super(thing, subscribeTimeout);
114 this.gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory()).create();
115 this.channelTypeProvider = channelTypeProvider;
116 this.transformationServiceProvider = transformationServiceProvider;
117 this.attributeReceiveTimeout = attributeReceiveTimeout;
118 this.delayedProcessing = new DelayedBatchProcessing<>(attributeReceiveTimeout, this, scheduler);
119 this.discoverComponents = new DiscoverComponents(thing.getUID(), scheduler, this, this, gson,
120 this.transformationServiceProvider);
123 @SuppressWarnings({ "null", "unused" })
125 public void initialize() {
128 config = getConfigAs(HandlerConfiguration.class);
129 if (config.topics == null || config.topics.isEmpty()) {
130 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Device topics unknown");
133 discoveryHomeAssistantIDs.addAll(HaID.fromConfig(config));
135 for (Channel channel : thing.getChannels()) {
136 final String groupID = channel.getUID().getGroupId();
137 if (groupID == null) {
138 logger.warn("Channel {} has no groupd ID", channel.getLabel());
141 // Already restored component?
143 AbstractComponent<?> component = haComponents.get(groupID);
144 if (component != null) {
145 // the types may have been removed in dispose() so we need to add them again
146 component.addChannelTypes(channelTypeProvider);
150 HaID haID = HaID.fromConfig(config.basetopic, channel.getConfiguration());
151 discoveryHomeAssistantIDs.add(haID);
152 ThingUID thingUID = channel.getUID().getThingUID();
153 String channelConfigurationJSON = (String) channel.getConfiguration().get("config");
154 if (channelConfigurationJSON == null) {
155 logger.warn("Provided channel does not have a 'config' configuration key!");
158 component = ComponentFactory.createComponent(thingUID, haID, channelConfigurationJSON, this, this,
159 scheduler, gson, transformationServiceProvider);
160 haComponents.put(component.getGroupUID().getId(), component);
161 component.addChannelTypes(channelTypeProvider);
162 } catch (ConfigurationException e) {
163 logger.error("Cannot not restore component {}: {}", thing, e.getMessage());
173 public void dispose() {
174 // super.dispose() calls stop()
176 haComponents.values().forEach(c -> c.removeChannelTypes(channelTypeProvider));
180 public CompletableFuture<Void> unsubscribeAll() {
181 // already unsubscribed everything by calling stop()
182 return CompletableFuture.allOf();
186 * Start a background discovery for the configured HA MQTT object-id.
189 protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
192 connection.setQos(1);
193 updateStatus(ThingStatus.UNKNOWN);
195 // Start all known components and channels within the components and put the Thing offline
196 // if any subscribing failed ( == broker connection lost)
197 CompletableFuture<@Nullable Void> future = haComponents.values().parallelStream()
198 .map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
199 .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to one
200 .exceptionally(e -> {
201 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
206 .thenCompose(b -> discoverComponents.startDiscovery(connection, 0, discoveryHomeAssistantIDs, this));
210 protected void stop() {
212 discoverComponents.stopDiscovery();
213 delayedProcessing.join();
214 // haComponents does not need to be synchronised -> the discovery thread is disabled
215 haComponents.values().parallelStream().map(AbstractComponent::stop) //
216 // we need to join all the stops, otherwise they might not be done when start is called
217 .collect(FutureCollector.allOf()).join();
224 @SuppressWarnings({ "null", "unused" })
226 public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
227 String groupID = channelUID.getGroupId();
228 if (groupID == null) {
231 AbstractComponent<?> component;
232 synchronized (haComponents) { // sync whenever discoverComponents is started
233 component = haComponents.get(groupID);
235 if (component == null) {
238 ComponentChannel componentChannel = component.getChannel(channelUID.getIdWithoutGroup());
239 if (componentChannel == null) {
242 return componentChannel.getState();
246 * Callback of {@link DiscoverComponents}. Add to a delayed batch processor.
249 public void componentDiscovered(HaID homeAssistantTopicID, AbstractComponent<?> component) {
250 delayedProcessing.accept(component);
254 * Callback of {@link DelayedBatchProcessing}.
255 * Add all newly discovered components to the Thing and start the components.
257 @SuppressWarnings("null")
259 public void accept(List<AbstractComponent<?>> discoveredComponentsList) {
260 MqttBrokerConnection connection = this.connection;
261 if (connection == null) {
265 synchronized (haComponents) { // sync whenever discoverComponents is started
266 for (AbstractComponent<?> discovered : discoveredComponentsList) {
267 AbstractComponent<?> known = haComponents.get(discovered.getGroupUID().getId());
268 // Is component already known?
270 if (discovered.getConfigHash() != known.getConfigHash()) {
271 // Don't wait for the future to complete. We are also not interested in failures.
272 // The component will be replaced in a moment.
275 known.setConfigSeen();
280 // Add channel and group types to the types registry
281 discovered.addChannelTypes(channelTypeProvider);
282 // Add component to the component map
283 haComponents.put(discovered.getGroupUID().getId(), discovered);
284 // Start component / Subscribe to channel topics
285 discovered.start(connection, scheduler, 0).exceptionally(e -> {
286 logger.warn("Failed to start component {}", discovered.getGroupUID(), e);
290 Collection<Channel> channels = discovered.getChannelMap().values().stream()
291 .map(ComponentChannel::getChannel).collect(Collectors.toList());
292 ThingHelper.addChannelsToThing(thing, channels);
300 protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {
301 if (availabilityTopicsSeen.orElse(messageReceived)) {
302 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
304 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
308 private void updateThingType() {
309 // if this is a dynamic type, then we update the type
310 ThingTypeUID typeID = thing.getThingTypeUID();
311 if (!MqttBindingConstants.HOMEASSISTANT_MQTT_THING.equals(typeID)) {
312 List<ChannelGroupDefinition> groupDefs;
313 List<ChannelDefinition> channelDefs;
314 synchronized (haComponents) { // sync whenever discoverComponents is started
315 groupDefs = haComponents.values().stream().map(AbstractComponent::getGroupDefinition)
316 .collect(Collectors.toList());
317 channelDefs = haComponents.values().stream().map(AbstractComponent::getType)
318 .map(ChannelGroupType::getChannelDefinitions).flatMap(List::stream)
319 .collect(Collectors.toList());
321 ThingType thingType = channelTypeProvider.derive(typeID, MqttBindingConstants.HOMEASSISTANT_MQTT_THING)
322 .withChannelDefinitions(channelDefs).withChannelGroupDefinitions(groupDefs).build();
324 channelTypeProvider.setThingType(typeID, thingType);