]> git.basschouten.com Git - openhab-addons.git/blob
28bc58a04b8a1b2b998765369a15f436b2827e31
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.nikohomecontrol.internal.protocol.nhc2;
14
15 import java.io.ByteArrayInputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.nio.charset.StandardCharsets;
19 import java.security.KeyStore;
20 import java.security.KeyStoreException;
21 import java.security.NoSuchAlgorithmException;
22 import java.security.cert.CertificateException;
23 import java.security.cert.CertificateFactory;
24 import java.security.cert.X509Certificate;
25 import java.util.ResourceBundle;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30
31 import javax.net.ssl.TrustManager;
32 import javax.net.ssl.TrustManagerFactory;
33
34 import org.eclipse.jdt.annotation.NonNullByDefault;
35 import org.eclipse.jdt.annotation.Nullable;
36 import org.openhab.core.io.transport.mqtt.MqttActionCallback;
37 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
38 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
39 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
40 import org.openhab.core.io.transport.mqtt.MqttException;
41 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
42 import org.openhab.core.io.transport.mqtt.reconnect.AbstractReconnectStrategy;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * {@link NhcMqttConnection2} manages the MQTT connection to the Connected Controller. It allows receiving state
48  * information about specific devices and sending updates to specific devices.
49  *
50  * @author Mark Herwege - Initial Contribution
51  */
52 @NonNullByDefault
53 public class NhcMqttConnection2 implements MqttActionCallback {
54
55     private final Logger logger = LoggerFactory.getLogger(NhcMqttConnection2.class);
56
57     private volatile @Nullable MqttBrokerConnection mqttConnection;
58
59     private volatile @Nullable CompletableFuture<Boolean> subscribedFuture;
60     private volatile @Nullable CompletableFuture<Boolean> stoppedFuture;
61
62     private MqttMessageSubscriber messageSubscriber;
63     private MqttConnectionObserver connectionObserver;
64
65     private TrustManager[] trustManagers;
66     private String clientId;
67
68     private volatile String cocoAddress = "";
69     private volatile int port;
70     private volatile String profile = "";
71     private volatile String token = "";
72
73     NhcMqttConnection2(String clientId, MqttMessageSubscriber messageSubscriber,
74             MqttConnectionObserver connectionObserver) throws CertificateException {
75         trustManagers = getTrustManagers();
76         this.clientId = clientId;
77         this.messageSubscriber = messageSubscriber;
78         this.connectionObserver = connectionObserver;
79     }
80
81     private TrustManager[] getTrustManagers() throws CertificateException {
82         ResourceBundle certificatesBundle = ResourceBundle.getBundle("nikohomecontrol/certificates");
83
84         try {
85             // Load server public certificates into key store
86             CertificateFactory cf = CertificateFactory.getInstance("X509");
87             InputStream certificateStream;
88             final KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
89             keyStore.load(null, null);
90             for (String certName : certificatesBundle.keySet()) {
91                 certificateStream = new ByteArrayInputStream(
92                         certificatesBundle.getString(certName).getBytes(StandardCharsets.UTF_8));
93                 X509Certificate certificate = (X509Certificate) cf.generateCertificate(certificateStream);
94                 keyStore.setCertificateEntry(certName, certificate);
95             }
96
97             ResourceBundle.clearCache();
98
99             // Create trust managers used to validate server
100             TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
101             tmFactory.init(keyStore);
102             return tmFactory.getTrustManagers();
103         } catch (CertificateException | KeyStoreException | NoSuchAlgorithmException | IOException e) {
104             logger.debug("error with SSL context creation: {} ", e.getMessage());
105             throw new CertificateException("SSL context creation exception", e);
106         } finally {
107             ResourceBundle.clearCache();
108         }
109     }
110
111     /**
112      * Start a secure MQTT connection and subscribe to all topics.
113      *
114      * @param subscriber MqttMessageSubscriber that will handle received messages
115      * @param cocoAddress IP Address of the Niko Connected Controller
116      * @param port Port for MQTT communication with the Niko Connected Controller
117      * @param token JWT token for the hobby profile
118      * @throws MqttException
119      */
120     synchronized void startConnection(String cocoAddress, int port, String profile, String token) throws MqttException {
121         CompletableFuture<Boolean> future = stoppedFuture;
122         if (future != null) {
123             try {
124                 future.get(5000, TimeUnit.MILLISECONDS);
125                 logger.debug("finished stopping connection");
126             } catch (InterruptedException | ExecutionException | TimeoutException ignore) {
127                 logger.debug("error stopping connection");
128             }
129             stoppedFuture = null;
130         }
131
132         logger.debug("starting connection...");
133         this.cocoAddress = cocoAddress;
134         this.port = port;
135         this.profile = profile;
136         this.token = token;
137         MqttBrokerConnection connection = createMqttConnection();
138         connection.addConnectionObserver(connectionObserver);
139         mqttConnection = connection;
140         try {
141             if (connection.start().get(5000, TimeUnit.MILLISECONDS)) {
142                 if (subscribedFuture == null) {
143                     subscribedFuture = connection.subscribe("#", messageSubscriber);
144                 }
145             } else {
146                 logger.debug("error connecting");
147                 throw new MqttException("Connection execution exception");
148             }
149         } catch (InterruptedException e) {
150             logger.debug("connection interrupted exception");
151             throw new MqttException("Connection interrupted exception");
152         } catch (ExecutionException e) {
153             logger.debug("connection execution exception", e.getCause());
154             throw new MqttException("Connection execution exception");
155         } catch (TimeoutException e) {
156             logger.debug("connection timeout exception");
157             throw new MqttException("Connection timeout exception");
158         }
159     }
160
161     private MqttBrokerConnection createMqttConnection() throws MqttException {
162         MqttBrokerConnection connection = new MqttBrokerConnection(cocoAddress, port, true, false, clientId);
163         connection.setTrustManagers(trustManagers);
164         connection.setCredentials(profile, token);
165         connection.setQos(1);
166
167         // Don't use the transport periodic reconnect strategy. It doesn't restart the initialization when the
168         // connection is lost and creates extra threads that do not get cleaned up. Just stop it.
169         AbstractReconnectStrategy reconnectStrategy = connection.getReconnectStrategy();
170         if (reconnectStrategy != null) {
171             reconnectStrategy.stop();
172         }
173         return connection;
174     }
175
176     /**
177      * Stop the MQTT connection.
178      */
179     void stopConnection() {
180         logger.debug("stopping connection...");
181         MqttBrokerConnection connection = mqttConnection;
182         if (connection != null) {
183             connection.removeConnectionObserver(connectionObserver);
184         }
185         stoppedFuture = stopConnection(connection);
186         mqttConnection = null;
187
188         CompletableFuture<Boolean> future = subscribedFuture;
189         if (future != null) {
190             future.complete(false);
191             subscribedFuture = null;
192         }
193     }
194
195     private CompletableFuture<Boolean> stopConnection(@Nullable MqttBrokerConnection connection) {
196         if (connection != null) {
197             return connection.stop();
198         } else {
199             return CompletableFuture.completedFuture(true);
200         }
201     }
202
203     /**
204      * @return true if connection established and subscribed to all topics
205      */
206     private boolean isConnected() {
207         MqttBrokerConnection connection = mqttConnection;
208         CompletableFuture<Boolean> future = subscribedFuture;
209
210         if (connection != null) {
211             try {
212                 if ((future != null) && future.get(5000, TimeUnit.MILLISECONDS)) {
213                     MqttConnectionState state = connection.connectionState();
214                     logger.debug("connection state {} for {}", state, connection.getClientId());
215                     return state == MqttConnectionState.CONNECTED;
216                 }
217             } catch (InterruptedException | ExecutionException | TimeoutException e) {
218                 return false;
219             }
220         }
221         return false;
222     }
223
224     /**
225      * Publish a message on the general connection.
226      *
227      * @param topic
228      * @param payload
229      * @throws MqttException
230      */
231     void connectionPublish(String topic, String payload) throws MqttException {
232         MqttBrokerConnection connection = mqttConnection;
233         if (connection == null) {
234             logger.debug("cannot publish, no connection");
235             throw new MqttException("No connection exception");
236         }
237
238         if (isConnected()) {
239             logger.debug("publish {}, {}", topic, payload);
240             connection.publish(topic, payload.getBytes(), connection.getQos(), false);
241         } else {
242             logger.debug("cannot publish, not subscribed to connection messages");
243         }
244     }
245
246     @Override
247     public void onSuccess(String topic) {
248         logger.debug("publish succeeded {}", topic);
249     }
250
251     @Override
252     public void onFailure(String topic, Throwable error) {
253         logger.debug("publish failed {}, {}", topic, error.getMessage(), error);
254     }
255 }