]> git.basschouten.com Git - openhab-addons.git/blob
6468e3437470ffc38abccf14d608f72096d9f24c
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.handler;
14
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.HashMap;
18 import java.util.Map;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.TimeoutException;
21
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryParticipant;
25 import org.openhab.binding.mqtt.discovery.TopicSubscribe;
26 import org.openhab.binding.mqtt.internal.action.MQTTActions;
27 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
28 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
29 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
30 import org.openhab.core.io.transport.mqtt.MqttService;
31 import org.openhab.core.thing.*;
32 import org.openhab.core.thing.binding.BaseBridgeHandler;
33 import org.openhab.core.thing.binding.ThingHandlerService;
34 import org.openhab.core.types.Command;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * This base implementation handles connection changes of the {@link MqttBrokerConnection}
40  * and puts the Thing on or offline. It also handles adding/removing notifications of the
41  * {@link MqttService} and provides a basic dispose() implementation.
42  *
43  * @author David Graeff - Initial contribution
44  */
45 @NonNullByDefault
46 public abstract class AbstractBrokerHandler extends BaseBridgeHandler implements MqttConnectionObserver {
47     public static final int TIMEOUT_DEFAULT = 1200; /* timeout in milliseconds */
48     private final Logger logger = LoggerFactory.getLogger(AbstractBrokerHandler.class);
49
50     final Map<ChannelUID, PublishTriggerChannel> channelStateByChannelUID = new HashMap<>();
51     private final Map<String, @Nullable Map<MQTTTopicDiscoveryParticipant, @Nullable TopicSubscribe>> discoveryTopics = new HashMap<>();
52
53     protected @Nullable MqttBrokerConnection connection;
54     protected CompletableFuture<MqttBrokerConnection> connectionFuture = new CompletableFuture<>();
55
56     public AbstractBrokerHandler(Bridge thing) {
57         super(thing);
58     }
59
60     @Override
61     public Collection<Class<? extends ThingHandlerService>> getServices() {
62         return Collections.singleton(MQTTActions.class);
63     }
64
65     /**
66      * Returns the underlying {@link MqttBrokerConnection} either immediately or after {@link #initialize()} has
67      * performed.
68      */
69     public CompletableFuture<MqttBrokerConnection> getConnectionAsync() {
70         return connectionFuture;
71     }
72
73     /**
74      * Returns the underlying {@link MqttBrokerConnection}.
75      */
76     public @Nullable MqttBrokerConnection getConnection() {
77         return connection;
78     }
79
80     /**
81      * Does nothing in the base implementation.
82      */
83     @Override
84     public void handleCommand(ChannelUID channelUID, Command command) {
85         // No commands to handle
86     }
87
88     /**
89      * Registers a connection status listener and attempts a connection if there is none so far.
90      */
91     @Override
92     public void initialize() {
93         final MqttBrokerConnection connection = this.connection;
94         if (connection == null) {
95             logger.warn("Trying to initialize {} but connection is null. This is most likely a bug.", thing.getUID());
96             return;
97         }
98         for (Channel channel : thing.getChannels()) {
99             final PublishTriggerChannelConfig channelConfig = channel.getConfiguration()
100                     .as(PublishTriggerChannelConfig.class);
101             PublishTriggerChannel c = new PublishTriggerChannel(channelConfig, channel.getUID(), connection, this);
102             channelStateByChannelUID.put(channel.getUID(), c);
103         }
104
105         connection.addConnectionObserver(this);
106
107         connection.start().exceptionally(e -> {
108             connectionStateChanged(MqttConnectionState.DISCONNECTED, e);
109             return false;
110         }).thenAccept(v -> {
111             if (!v) {
112                 connectionStateChanged(MqttConnectionState.DISCONNECTED, new TimeoutException("Timeout"));
113             } else {
114                 connectionStateChanged(MqttConnectionState.CONNECTED, null);
115             }
116         });
117         connectionFuture.complete(connection);
118
119         discoveryTopics.forEach((topic, listenerMap) -> {
120             listenerMap.replaceAll((listener, oldTopicSubscribe) -> {
121                 if (oldTopicSubscribe.isStarted()) {
122                     oldTopicSubscribe.stop();
123                 }
124
125                 TopicSubscribe topicSubscribe = new TopicSubscribe(connection, topic, listener, thing.getUID());
126                 if (discoveryEnabled()) {
127                     topicSubscribe.start().handle((result, ex) -> {
128                         if (ex != null) {
129                             logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
130                                     thing.getUID());
131                         } else {
132                             logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic,
133                                     thing.getUID());
134                         }
135                         return null;
136                     });
137                 }
138                 return topicSubscribe;
139             });
140         });
141     }
142
143     @Override
144     public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
145         if (state == MqttConnectionState.CONNECTED) {
146             updateStatus(ThingStatus.ONLINE);
147             channelStateByChannelUID.values().forEach(PublishTriggerChannel::start);
148         } else {
149             channelStateByChannelUID.values().forEach(PublishTriggerChannel::stop);
150             if (error == null) {
151                 updateStatus(ThingStatus.OFFLINE);
152             } else {
153                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, error.getMessage());
154             }
155         }
156     }
157
158     @Override
159     protected void triggerChannel(ChannelUID channelUID, String event) {
160         super.triggerChannel(channelUID, event);
161     }
162
163     /**
164      * Removes listeners to the {@link MqttBrokerConnection}.
165      */
166     @Override
167     public void dispose() {
168         channelStateByChannelUID.values().forEach(PublishTriggerChannel::stop);
169         channelStateByChannelUID.clear();
170
171         // keep topics, but stop subscriptions
172         discoveryTopics.forEach((topic, listenerMap) -> {
173             listenerMap.forEach((listener, topicSubscribe) -> {
174                 topicSubscribe.stop();
175             });
176         });
177
178         if (connection != null) {
179             connection.removeConnectionObserver(this);
180         } else {
181             logger.warn("Trying to dispose handler {} but connection is already null. Most likely this is a bug.",
182                     thing.getUID());
183         }
184         this.connection = null;
185         connectionFuture = new CompletableFuture<>();
186         super.dispose();
187     }
188
189     /**
190      * register a discovery listener to a specified topic on this broker (used by the handler factory)
191      *
192      * @param listener the discovery participant that wishes to be notified about this topic
193      * @param topic the topic (wildcards supported)
194      */
195     public final void registerDiscoveryListener(MQTTTopicDiscoveryParticipant listener, String topic) {
196         Map<MQTTTopicDiscoveryParticipant, @Nullable TopicSubscribe> topicListeners = discoveryTopics
197                 .computeIfAbsent(topic, t -> new HashMap<>());
198         topicListeners.compute(listener, (k, v) -> {
199             if (v != null) {
200                 logger.warn("Duplicate subscription for {} to discovery topic {} on broker {}. Check discovery logic!",
201                         listener, topic, thing.getUID());
202                 v.stop();
203             }
204
205             TopicSubscribe topicSubscribe = new TopicSubscribe(connection, topic, listener, thing.getUID());
206             if (discoveryEnabled()) {
207                 topicSubscribe.start().handle((result, ex) -> {
208                     if (ex != null) {
209                         logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
210                                 thing.getUID());
211                     } else {
212                         logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic,
213                                 thing.getUID());
214                     }
215                     return null;
216                 });
217             }
218             return topicSubscribe;
219         });
220     }
221
222     /**
223      * unregisters a discovery listener from a specified topic on this broker (used by the handler factory)
224      *
225      * @param listener the discovery participant that wishes no notifications about this topic
226      * @param topic the topic (as specified during registration)
227      */
228     public final void unregisterDiscoveryListener(MQTTTopicDiscoveryParticipant listener, String topic) {
229         Map<MQTTTopicDiscoveryParticipant, @Nullable TopicSubscribe> topicListeners = discoveryTopics.compute(topic,
230                 (k, v) -> {
231                     if (v == null) {
232                         logger.warn(
233                                 "Tried to unsubscribe {} from  discovery topic {} on broker {} but topic not registered at all. Check discovery logic!",
234                                 listener, topic, thing.getUID());
235                         return null;
236                     }
237                     v.compute(listener, (l, w) -> {
238                         if (w == null) {
239                             logger.warn(
240                                     "Tried to unsubscribe {} from  discovery topic {} on broker {} but topic not registered for listener. Check discovery logic!",
241                                     listener, topic, thing.getUID());
242                         } else {
243                             w.stop();
244                             logger.trace("Unsubscribed {} from discovery topic {} on broker {}", listener, topic,
245                                     thing.getUID());
246                         }
247                         return null;
248                     });
249                     return v.isEmpty() ? null : v;
250                 });
251     }
252
253     /**
254      * check whether discovery is disabled on this broker
255      *
256      * @return true if discovery disabled
257      */
258     public abstract boolean discoveryEnabled();
259 }