2 * Copyright (c) 2010-2022 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.handler;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.HashMap;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.TimeoutException;
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryParticipant;
25 import org.openhab.binding.mqtt.discovery.TopicSubscribe;
26 import org.openhab.binding.mqtt.internal.action.MQTTActions;
27 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
28 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
29 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
30 import org.openhab.core.io.transport.mqtt.MqttService;
31 import org.openhab.core.thing.Bridge;
32 import org.openhab.core.thing.Channel;
33 import org.openhab.core.thing.ChannelUID;
34 import org.openhab.core.thing.ThingStatus;
35 import org.openhab.core.thing.ThingStatusDetail;
36 import org.openhab.core.thing.binding.BaseBridgeHandler;
37 import org.openhab.core.thing.binding.ThingHandlerService;
38 import org.openhab.core.types.Command;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * This base implementation handles connection changes of the {@link MqttBrokerConnection}
44 * and puts the Thing on or offline. It also handles adding/removing notifications of the
45 * {@link MqttService} and provides a basic dispose() implementation.
47 * @author David Graeff - Initial contribution
50 public abstract class AbstractBrokerHandler extends BaseBridgeHandler implements MqttConnectionObserver {
51 public static final int TIMEOUT_DEFAULT = 1200; /* timeout in milliseconds */
52 private final Logger logger = LoggerFactory.getLogger(AbstractBrokerHandler.class);
54 final Map<ChannelUID, PublishTriggerChannel> channelStateByChannelUID = new HashMap<>();
55 private final Map<String, @Nullable Map<MQTTTopicDiscoveryParticipant, @Nullable TopicSubscribe>> discoveryTopics = new HashMap<>();
57 protected @Nullable MqttBrokerConnection connection;
58 protected CompletableFuture<MqttBrokerConnection> connectionFuture = new CompletableFuture<>();
60 public AbstractBrokerHandler(Bridge thing) {
65 public Collection<Class<? extends ThingHandlerService>> getServices() {
66 return Collections.singleton(MQTTActions.class);
70 * Returns the underlying {@link MqttBrokerConnection} either immediately or after {@link #initialize()} has
73 public CompletableFuture<MqttBrokerConnection> getConnectionAsync() {
74 return connectionFuture;
78 * Returns the underlying {@link MqttBrokerConnection}.
80 public @Nullable MqttBrokerConnection getConnection() {
85 * Does nothing in the base implementation.
88 public void handleCommand(ChannelUID channelUID, Command command) {
89 // No commands to handle
93 * Registers a connection status listener and attempts a connection if there is none so far.
96 public void initialize() {
97 final MqttBrokerConnection connection = this.connection;
98 if (connection == null) {
99 logger.warn("Trying to initialize {} but connection is null. This is most likely a bug.", thing.getUID());
102 for (Channel channel : thing.getChannels()) {
103 final PublishTriggerChannelConfig channelConfig = channel.getConfiguration()
104 .as(PublishTriggerChannelConfig.class);
105 PublishTriggerChannel c = new PublishTriggerChannel(channelConfig, channel.getUID(), connection, this);
106 channelStateByChannelUID.put(channel.getUID(), c);
109 connection.addConnectionObserver(this);
111 connection.start().exceptionally(e -> {
112 connectionStateChanged(MqttConnectionState.DISCONNECTED, e);
116 connectionStateChanged(MqttConnectionState.DISCONNECTED, new TimeoutException("Timeout"));
119 connectionFuture.complete(connection);
121 discoveryTopics.forEach((topic, listenerMap) -> {
122 listenerMap.replaceAll((listener, oldTopicSubscribe) -> {
123 if (oldTopicSubscribe.isStarted()) {
124 oldTopicSubscribe.stop();
127 TopicSubscribe topicSubscribe = new TopicSubscribe(connection, topic, listener, thing.getUID());
128 if (discoveryEnabled()) {
129 topicSubscribe.start().handle((result, ex) -> {
131 logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
134 logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic,
140 return topicSubscribe;
146 public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
147 if (state == MqttConnectionState.CONNECTED) {
148 updateStatus(ThingStatus.ONLINE);
149 channelStateByChannelUID.values().forEach(PublishTriggerChannel::start);
151 channelStateByChannelUID.values().forEach(PublishTriggerChannel::stop);
153 updateStatus(ThingStatus.OFFLINE);
155 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, error.getMessage());
161 protected void triggerChannel(ChannelUID channelUID, String event) {
162 super.triggerChannel(channelUID, event);
166 * Removes listeners to the {@link MqttBrokerConnection}.
169 public void dispose() {
170 channelStateByChannelUID.values().forEach(PublishTriggerChannel::stop);
171 channelStateByChannelUID.clear();
173 // keep topics, but stop subscriptions
174 discoveryTopics.forEach((topic, listenerMap) -> {
175 listenerMap.forEach((listener, topicSubscribe) -> {
176 topicSubscribe.stop();
180 if (connection != null) {
181 connection.removeConnectionObserver(this);
183 logger.warn("Trying to dispose handler {} but connection is already null. Most likely this is a bug.",
186 this.connection = null;
187 connectionFuture = new CompletableFuture<>();
192 * register a discovery listener to a specified topic on this broker (used by the handler factory)
194 * @param listener the discovery participant that wishes to be notified about this topic
195 * @param topic the topic (wildcards supported)
197 public final void registerDiscoveryListener(MQTTTopicDiscoveryParticipant listener, String topic) {
198 Map<MQTTTopicDiscoveryParticipant, @Nullable TopicSubscribe> topicListeners = discoveryTopics
199 .computeIfAbsent(topic, t -> new HashMap<>());
200 topicListeners.compute(listener, (k, v) -> {
202 logger.warn("Duplicate subscription for {} to discovery topic {} on broker {}. Check discovery logic!",
203 listener, topic, thing.getUID());
207 TopicSubscribe topicSubscribe = new TopicSubscribe(connection, topic, listener, thing.getUID());
208 if (discoveryEnabled()) {
209 topicSubscribe.start().handle((result, ex) -> {
211 logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
214 logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic,
220 return topicSubscribe;
225 * unregisters a discovery listener from a specified topic on this broker (used by the handler factory)
227 * @param listener the discovery participant that wishes no notifications about this topic
228 * @param topic the topic (as specified during registration)
230 public final void unregisterDiscoveryListener(MQTTTopicDiscoveryParticipant listener, String topic) {
231 discoveryTopics.compute(topic, (k, v) -> {
234 "Tried to unsubscribe {} from discovery topic {} on broker {} but topic not registered at all. Check discovery logic!",
235 listener, topic, thing.getUID());
238 v.compute(listener, (l, w) -> {
241 "Tried to unsubscribe {} from discovery topic {} on broker {} but topic not registered for listener. Check discovery logic!",
242 listener, topic, thing.getUID());
245 logger.trace("Unsubscribed {} from discovery topic {} on broker {}", listener, topic,
250 return v.isEmpty() ? null : v;
255 * check whether discovery is disabled on this broker
257 * @return true if discovery disabled
259 public abstract boolean discoveryEnabled();