]> git.basschouten.com Git - openhab-addons.git/blob
1487d98317f7acba653e7a379bd336c67e299006
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.nikohomecontrol.internal.protocol.nhc2;
14
15 import java.io.ByteArrayInputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.nio.charset.StandardCharsets;
19 import java.security.KeyStore;
20 import java.security.KeyStoreException;
21 import java.security.NoSuchAlgorithmException;
22 import java.security.cert.CertificateException;
23 import java.security.cert.CertificateFactory;
24 import java.security.cert.X509Certificate;
25 import java.util.ResourceBundle;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30
31 import javax.net.ssl.TrustManager;
32 import javax.net.ssl.TrustManagerFactory;
33
34 import org.eclipse.jdt.annotation.NonNullByDefault;
35 import org.eclipse.jdt.annotation.Nullable;
36 import org.openhab.core.io.transport.mqtt.MqttActionCallback;
37 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
38 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
39 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
40 import org.openhab.core.io.transport.mqtt.MqttException;
41 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * {@link NhcMqttConnection2} manages the MQTT connection to the Connected Controller. It allows receiving state
47  * information about specific devices and sending updates to specific devices.
48  *
49  * @author Mark Herwege - Initial Contribution
50  */
51 @NonNullByDefault
52 public class NhcMqttConnection2 implements MqttActionCallback {
53
54     private final Logger logger = LoggerFactory.getLogger(NhcMqttConnection2.class);
55
56     private volatile @Nullable MqttBrokerConnection mqttConnection;
57
58     private volatile @Nullable CompletableFuture<Boolean> subscribedFuture;
59     private volatile @Nullable CompletableFuture<Boolean> stoppedFuture;
60
61     private MqttMessageSubscriber messageSubscriber;
62     private MqttConnectionObserver connectionObserver;
63
64     private TrustManager trustManagers[];
65     private String clientId;
66
67     private volatile String cocoAddress = "";
68     private volatile int port;
69     private volatile String profile = "";
70     private volatile String token = "";
71
72     NhcMqttConnection2(String clientId, MqttMessageSubscriber messageSubscriber,
73             MqttConnectionObserver connectionObserver) throws CertificateException {
74         trustManagers = getTrustManagers();
75         this.clientId = clientId;
76         this.messageSubscriber = messageSubscriber;
77         this.connectionObserver = connectionObserver;
78     }
79
80     private TrustManager[] getTrustManagers() throws CertificateException {
81         ResourceBundle certificatesBundle = ResourceBundle.getBundle("nikohomecontrol/certificates");
82
83         try {
84             // Load server public certificates into key store
85             CertificateFactory cf = CertificateFactory.getInstance("X509");
86             InputStream certificateStream;
87             final KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
88             keyStore.load(null, null);
89             for (String certName : certificatesBundle.keySet()) {
90                 certificateStream = new ByteArrayInputStream(
91                         certificatesBundle.getString(certName).getBytes(StandardCharsets.UTF_8));
92                 X509Certificate certificate = (X509Certificate) cf.generateCertificate(certificateStream);
93                 keyStore.setCertificateEntry(certName, certificate);
94             }
95
96             ResourceBundle.clearCache();
97
98             // Create trust managers used to validate server
99             TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
100             tmFactory.init(keyStore);
101             return tmFactory.getTrustManagers();
102         } catch (CertificateException | KeyStoreException | NoSuchAlgorithmException | IOException e) {
103             logger.debug("error with SSL context creation: {} ", e.getMessage());
104             throw new CertificateException("SSL context creation exception", e);
105         } finally {
106             ResourceBundle.clearCache();
107         }
108     }
109
110     /**
111      * Start a secure MQTT connection and subscribe to all topics.
112      *
113      * @param subscriber MqttMessageSubscriber that will handle received messages
114      * @param cocoAddress IP Address of the Niko Connected Controller
115      * @param port Port for MQTT communication with the Niko Connected Controller
116      * @param token JWT token for the hobby profile
117      * @throws MqttException
118      */
119     synchronized void startConnection(String cocoAddress, int port, String profile, String token) throws MqttException {
120         CompletableFuture<Boolean> future = stoppedFuture;
121         if (future != null) {
122             try {
123                 future.get(5000, TimeUnit.MILLISECONDS);
124                 logger.debug("finished stopping connection");
125             } catch (InterruptedException | ExecutionException | TimeoutException ignore) {
126                 logger.debug("error stopping connection");
127             }
128             stoppedFuture = null;
129         }
130
131         logger.debug("starting connection...");
132         this.cocoAddress = cocoAddress;
133         this.port = port;
134         this.profile = profile;
135         this.token = token;
136         MqttBrokerConnection connection = createMqttConnection();
137         connection.addConnectionObserver(connectionObserver);
138         mqttConnection = connection;
139         try {
140             if (connection.start().get(5000, TimeUnit.MILLISECONDS)) {
141                 if (subscribedFuture == null) {
142                     subscribedFuture = connection.subscribe("#", messageSubscriber);
143                 }
144             } else {
145                 logger.debug("error connecting");
146                 throw new MqttException("Connection execution exception");
147             }
148         } catch (InterruptedException e) {
149             logger.debug("connection interrupted exception");
150             throw new MqttException("Connection interrupted exception");
151         } catch (ExecutionException e) {
152             logger.debug("connection execution exception", e.getCause());
153             throw new MqttException("Connection execution exception");
154         } catch (TimeoutException e) {
155             logger.debug("connection timeout exception");
156             throw new MqttException("Connection timeout exception");
157         }
158     }
159
160     private MqttBrokerConnection createMqttConnection() throws MqttException {
161         MqttBrokerConnection connection = new MqttBrokerConnection(cocoAddress, port, true, false, clientId);
162         connection.setTrustManagers(trustManagers);
163         connection.setCredentials(profile, token);
164         connection.setQos(1);
165         return connection;
166     }
167
168     /**
169      * Stop the MQTT connection.
170      */
171     void stopConnection() {
172         logger.debug("stopping connection...");
173         MqttBrokerConnection connection = mqttConnection;
174         if (connection != null) {
175             connection.removeConnectionObserver(connectionObserver);
176         }
177         stoppedFuture = stopConnection(connection);
178         mqttConnection = null;
179
180         CompletableFuture<Boolean> future = subscribedFuture;
181         if (future != null) {
182             future.complete(false);
183             subscribedFuture = null;
184         }
185     }
186
187     private CompletableFuture<Boolean> stopConnection(@Nullable MqttBrokerConnection connection) {
188         if (connection != null) {
189             return connection.stop();
190         } else {
191             return CompletableFuture.completedFuture(true);
192         }
193     }
194
195     /**
196      * @return true if connection established and subscribed to all topics
197      */
198     private boolean isConnected() {
199         MqttBrokerConnection connection = mqttConnection;
200         CompletableFuture<Boolean> future = subscribedFuture;
201
202         if (connection != null) {
203             try {
204                 if ((future != null) && future.get(5000, TimeUnit.MILLISECONDS)) {
205                     MqttConnectionState state = connection.connectionState();
206                     logger.debug("connection state {} for {}", state, connection.getClientId());
207                     return state == MqttConnectionState.CONNECTED;
208                 }
209             } catch (InterruptedException | ExecutionException | TimeoutException e) {
210                 return false;
211             }
212         }
213         return false;
214     }
215
216     /**
217      * Publish a message on the general connection.
218      *
219      * @param topic
220      * @param payload
221      * @throws MqttException
222      */
223     void connectionPublish(String topic, String payload) throws MqttException {
224         MqttBrokerConnection connection = mqttConnection;
225         if (connection == null) {
226             logger.debug("cannot publish, no connection");
227             throw new MqttException("No connection exception");
228         }
229
230         if (isConnected()) {
231             logger.debug("publish {}, {}", topic, payload);
232             connection.publish(topic, payload.getBytes(), connection.getQos(), false);
233         } else {
234             logger.debug("cannot publish, not subscribed to connection messages");
235         }
236     }
237
238     @Override
239     public void onSuccess(String topic) {
240         logger.debug("publish succeeded {}", topic);
241     }
242
243     @Override
244     public void onFailure(String topic, Throwable error) {
245         logger.debug("publish failed {}, {}", topic, error.getMessage(), error);
246     }
247 }