]> git.basschouten.com Git - openhab-addons.git/blob
ea7efefec62369451e1258791ccb281e70f0dbc7
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.irobot.internal.handler;
14
15 import static java.nio.charset.StandardCharsets.UTF_8;
16 import static org.openhab.binding.irobot.internal.IRobotBindingConstants.MQTT_PORT;
17 import static org.openhab.binding.irobot.internal.IRobotBindingConstants.TRUST_MANAGERS;
18
19 import java.io.IOException;
20 import java.net.InetAddress;
21 import java.net.UnknownHostException;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
31 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
32 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
33 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
34 import org.openhab.core.io.transport.mqtt.reconnect.PeriodicReconnectStrategy;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * The {@link IRobotConnectionHandler} is responsible for handling iRobot MQTT connection.
40  *
41  * @author hkuhn42 - Initial contribution
42  * @author Pavel Fedin - Rewrite for 900 series
43  * @author Alexander Falkenstern - Add support for I7 series
44  */
45 @NonNullByDefault
46 public abstract class IRobotConnectionHandler implements MqttConnectionObserver, MqttMessageSubscriber {
47     private final Logger logger = LoggerFactory.getLogger(IRobotConnectionHandler.class);
48
49     private static final int RECONNECT_DELAY = 10000; // In milliseconds
50     private @Nullable Future<?> reconnect;
51     private @Nullable MqttBrokerConnection connection;
52
53     public IRobotConnectionHandler() {
54     }
55
56     public synchronized void connect(final String ip, final String blid, final String password) {
57         InetAddress host = null;
58         try {
59             host = InetAddress.getByName(ip);
60         } catch (UnknownHostException exception) {
61             connectionStateChanged(MqttConnectionState.DISCONNECTED, exception);
62             return;
63         }
64
65         try {
66             boolean reachable = host.isReachable(1000);
67             if (logger.isTraceEnabled()) {
68                 logger.trace("Connection to {} can be established {}", ip, reachable);
69             }
70         } catch (IOException exception) {
71             connectionStateChanged(MqttConnectionState.DISCONNECTED, exception);
72             return;
73         }
74
75         // BLID is used as both client ID and username. The name of BLID also came from Roomba980-python
76         MqttBrokerConnection connection = new MqttBrokerConnection(ip, MQTT_PORT, true, blid);
77
78         // Disable sending UNSUBSCRIBE request before disconnecting because Roomba doesn't like it.
79         // It just swallows the request and never sends any response, so stop() method never completes.
80         connection.setUnsubscribeOnStop(false);
81         connection.setCredentials(blid, password);
82         connection.setTrustManagers(TRUST_MANAGERS);
83
84         // Roomba accepts MQTT qos 0 (AT_MOST_ONCE) only.
85         connection.setQos(0);
86
87         // MQTT connection reconnects itself, so we don't have to reconnect, when it breaks
88         connection.setReconnectStrategy(new PeriodicReconnectStrategy(RECONNECT_DELAY, RECONNECT_DELAY));
89
90         connection.start().exceptionally(exception -> {
91             connectionStateChanged(MqttConnectionState.DISCONNECTED, exception);
92             return false;
93         }).thenAccept(successful -> {
94             MqttConnectionState state = successful ? MqttConnectionState.CONNECTED : MqttConnectionState.DISCONNECTED;
95             connectionStateChanged(state, successful ? null : new TimeoutException("Timeout"));
96         });
97
98         this.connection = connection;
99     }
100
101     public synchronized void disconnect() {
102         Future<?> reconnect = this.reconnect;
103         if (reconnect != null) {
104             reconnect.cancel(false);
105             this.reconnect = null;
106         }
107
108         MqttBrokerConnection connection = this.connection;
109         if (connection != null) {
110             connection.unsubscribe("#", this);
111             CompletableFuture<Boolean> future = connection.stop();
112             try {
113                 future.get(10, TimeUnit.SECONDS);
114                 if (logger.isTraceEnabled()) {
115                     logger.trace("MQTT disconnect successful");
116                 }
117             } catch (InterruptedException | ExecutionException | TimeoutException exception) {
118                 logger.warn("MQTT disconnect failed: {}", exception.getMessage());
119             }
120             this.connection = null;
121         }
122     }
123
124     @Override
125     public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
126         if (state == MqttConnectionState.CONNECTED) {
127             MqttBrokerConnection connection = this.connection;
128
129             // This would be very strange, but Eclipse forces us to do the check
130             if (connection != null) {
131                 reconnect = null;
132
133                 // Roomba sends us two topics:
134                 // "wifistat" - reports signal strength and current robot position
135                 // "$aws/things/<BLID>/shadow/update" - the rest of messages
136                 // Subscribe to everything since we're interested in both
137                 connection.subscribe("#", this).exceptionally(exception -> {
138                     logger.warn("MQTT subscription failed: {}", exception.getMessage());
139                     return false;
140                 }).thenAccept(successful -> {
141                     if (successful && logger.isTraceEnabled()) {
142                         logger.trace("MQTT subscription successful");
143                     } else {
144                         logger.warn("MQTT subscription failed: Timeout");
145                     }
146                 });
147             } else {
148                 logger.warn("Established connection without broker pointer");
149             }
150         } else {
151             String message = (error != null) ? error.getMessage() : "Unknown reason";
152             logger.warn("MQTT connection failed: {}", message);
153         }
154     }
155
156     @Override
157     public void processMessage(String topic, byte[] payload) {
158         // Report raw JSON reply
159         final String json = new String(payload, UTF_8);
160         if (logger.isTraceEnabled()) {
161             logger.trace("Got topic {} data {}", topic, json);
162         }
163
164         receive(topic, json);
165     }
166
167     public abstract void receive(final String topic, final String json);
168
169     public void send(final String topic, final String payload) {
170         MqttBrokerConnection connection = this.connection;
171         if (connection != null) {
172             if (logger.isTraceEnabled()) {
173                 logger.trace("Sending {}: {}", topic, payload);
174             }
175             connection.publish(topic, payload.getBytes(UTF_8), connection.getQos(), false);
176         }
177     }
178 }