]> git.basschouten.com Git - openhab-addons.git/blob
6eb28d78233a86b80557e9ce0a641f2bf6b09d56
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.handler;
14
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.HashMap;
18 import java.util.Map;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.TimeoutException;
21
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryParticipant;
25 import org.openhab.binding.mqtt.discovery.TopicSubscribe;
26 import org.openhab.binding.mqtt.internal.action.MQTTActions;
27 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
28 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
29 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
30 import org.openhab.core.io.transport.mqtt.MqttService;
31 import org.openhab.core.thing.Bridge;
32 import org.openhab.core.thing.Channel;
33 import org.openhab.core.thing.ChannelUID;
34 import org.openhab.core.thing.ThingStatus;
35 import org.openhab.core.thing.ThingStatusDetail;
36 import org.openhab.core.thing.binding.BaseBridgeHandler;
37 import org.openhab.core.thing.binding.ThingHandlerService;
38 import org.openhab.core.types.Command;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * This base implementation handles connection changes of the {@link MqttBrokerConnection}
44  * and puts the Thing on or offline. It also handles adding/removing notifications of the
45  * {@link MqttService} and provides a basic dispose() implementation.
46  *
47  * @author David Graeff - Initial contribution
48  */
49 @NonNullByDefault
50 public abstract class AbstractBrokerHandler extends BaseBridgeHandler implements MqttConnectionObserver {
51     public static final int TIMEOUT_DEFAULT = 1200; /* timeout in milliseconds */
52     private final Logger logger = LoggerFactory.getLogger(AbstractBrokerHandler.class);
53
54     final Map<ChannelUID, PublishTriggerChannel> channelStateByChannelUID = new HashMap<>();
55     private final Map<String, @Nullable Map<MQTTTopicDiscoveryParticipant, @Nullable TopicSubscribe>> discoveryTopics = new HashMap<>();
56
57     protected @Nullable MqttBrokerConnection connection;
58     protected CompletableFuture<MqttBrokerConnection> connectionFuture = new CompletableFuture<>();
59
60     public AbstractBrokerHandler(Bridge thing) {
61         super(thing);
62     }
63
64     @Override
65     public Collection<Class<? extends ThingHandlerService>> getServices() {
66         return Collections.singleton(MQTTActions.class);
67     }
68
69     /**
70      * Returns the underlying {@link MqttBrokerConnection} either immediately or after {@link #initialize()} has
71      * performed.
72      */
73     public CompletableFuture<MqttBrokerConnection> getConnectionAsync() {
74         return connectionFuture;
75     }
76
77     /**
78      * Returns the underlying {@link MqttBrokerConnection}.
79      */
80     public @Nullable MqttBrokerConnection getConnection() {
81         return connection;
82     }
83
84     /**
85      * Does nothing in the base implementation.
86      */
87     @Override
88     public void handleCommand(ChannelUID channelUID, Command command) {
89         // No commands to handle
90     }
91
92     /**
93      * Registers a connection status listener and attempts a connection if there is none so far.
94      */
95     @Override
96     public void initialize() {
97         final MqttBrokerConnection connection = this.connection;
98         if (connection == null) {
99             logger.warn("Trying to initialize {} but connection is null. This is most likely a bug.", thing.getUID());
100             return;
101         }
102         for (Channel channel : thing.getChannels()) {
103             final PublishTriggerChannelConfig channelConfig = channel.getConfiguration()
104                     .as(PublishTriggerChannelConfig.class);
105             PublishTriggerChannel c = new PublishTriggerChannel(channelConfig, channel.getUID(), connection, this);
106             channelStateByChannelUID.put(channel.getUID(), c);
107         }
108
109         connection.addConnectionObserver(this);
110
111         connection.start().exceptionally(e -> {
112             connectionStateChanged(MqttConnectionState.DISCONNECTED, e);
113             return false;
114         }).thenAccept(v -> {
115             if (!v) {
116                 connectionStateChanged(MqttConnectionState.DISCONNECTED, new TimeoutException("Timeout"));
117             } else {
118                 connectionStateChanged(MqttConnectionState.CONNECTED, null);
119             }
120         });
121         connectionFuture.complete(connection);
122
123         discoveryTopics.forEach((topic, listenerMap) -> {
124             listenerMap.replaceAll((listener, oldTopicSubscribe) -> {
125                 if (oldTopicSubscribe.isStarted()) {
126                     oldTopicSubscribe.stop();
127                 }
128
129                 TopicSubscribe topicSubscribe = new TopicSubscribe(connection, topic, listener, thing.getUID());
130                 if (discoveryEnabled()) {
131                     topicSubscribe.start().handle((result, ex) -> {
132                         if (ex != null) {
133                             logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
134                                     thing.getUID());
135                         } else {
136                             logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic,
137                                     thing.getUID());
138                         }
139                         return null;
140                     });
141                 }
142                 return topicSubscribe;
143             });
144         });
145     }
146
147     @Override
148     public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
149         if (state == MqttConnectionState.CONNECTED) {
150             updateStatus(ThingStatus.ONLINE);
151             channelStateByChannelUID.values().forEach(PublishTriggerChannel::start);
152         } else {
153             channelStateByChannelUID.values().forEach(PublishTriggerChannel::stop);
154             if (error == null) {
155                 updateStatus(ThingStatus.OFFLINE);
156             } else {
157                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, error.getMessage());
158             }
159         }
160     }
161
162     @Override
163     protected void triggerChannel(ChannelUID channelUID, String event) {
164         super.triggerChannel(channelUID, event);
165     }
166
167     /**
168      * Removes listeners to the {@link MqttBrokerConnection}.
169      */
170     @Override
171     public void dispose() {
172         channelStateByChannelUID.values().forEach(PublishTriggerChannel::stop);
173         channelStateByChannelUID.clear();
174
175         // keep topics, but stop subscriptions
176         discoveryTopics.forEach((topic, listenerMap) -> {
177             listenerMap.forEach((listener, topicSubscribe) -> {
178                 topicSubscribe.stop();
179             });
180         });
181
182         if (connection != null) {
183             connection.removeConnectionObserver(this);
184         } else {
185             logger.warn("Trying to dispose handler {} but connection is already null. Most likely this is a bug.",
186                     thing.getUID());
187         }
188         this.connection = null;
189         connectionFuture = new CompletableFuture<>();
190         super.dispose();
191     }
192
193     /**
194      * register a discovery listener to a specified topic on this broker (used by the handler factory)
195      *
196      * @param listener the discovery participant that wishes to be notified about this topic
197      * @param topic the topic (wildcards supported)
198      */
199     public final void registerDiscoveryListener(MQTTTopicDiscoveryParticipant listener, String topic) {
200         Map<MQTTTopicDiscoveryParticipant, @Nullable TopicSubscribe> topicListeners = discoveryTopics
201                 .computeIfAbsent(topic, t -> new HashMap<>());
202         topicListeners.compute(listener, (k, v) -> {
203             if (v != null) {
204                 logger.warn("Duplicate subscription for {} to discovery topic {} on broker {}. Check discovery logic!",
205                         listener, topic, thing.getUID());
206                 v.stop();
207             }
208
209             TopicSubscribe topicSubscribe = new TopicSubscribe(connection, topic, listener, thing.getUID());
210             if (discoveryEnabled()) {
211                 topicSubscribe.start().handle((result, ex) -> {
212                     if (ex != null) {
213                         logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
214                                 thing.getUID());
215                     } else {
216                         logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic,
217                                 thing.getUID());
218                     }
219                     return null;
220                 });
221             }
222             return topicSubscribe;
223         });
224     }
225
226     /**
227      * unregisters a discovery listener from a specified topic on this broker (used by the handler factory)
228      *
229      * @param listener the discovery participant that wishes no notifications about this topic
230      * @param topic the topic (as specified during registration)
231      */
232     public final void unregisterDiscoveryListener(MQTTTopicDiscoveryParticipant listener, String topic) {
233         discoveryTopics.compute(topic, (k, v) -> {
234             if (v == null) {
235                 logger.warn(
236                         "Tried to unsubscribe {} from  discovery topic {} on broker {} but topic not registered at all. Check discovery logic!",
237                         listener, topic, thing.getUID());
238                 return null;
239             }
240             v.compute(listener, (l, w) -> {
241                 if (w == null) {
242                     logger.warn(
243                             "Tried to unsubscribe {} from  discovery topic {} on broker {} but topic not registered for listener. Check discovery logic!",
244                             listener, topic, thing.getUID());
245                 } else {
246                     w.stop();
247                     logger.trace("Unsubscribed {} from discovery topic {} on broker {}", listener, topic,
248                             thing.getUID());
249                 }
250                 return null;
251             });
252             return v.isEmpty() ? null : v;
253         });
254     }
255
256     /**
257      * check whether discovery is disabled on this broker
258      *
259      * @return true if discovery disabled
260      */
261     public abstract boolean discoveryEnabled();
262 }