2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.nikohomecontrol.internal.protocol.nhc2;
15 import java.io.ByteArrayInputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.nio.charset.StandardCharsets;
19 import java.security.KeyStore;
20 import java.security.KeyStoreException;
21 import java.security.NoSuchAlgorithmException;
22 import java.security.cert.CertificateException;
23 import java.security.cert.CertificateFactory;
24 import java.security.cert.X509Certificate;
25 import java.util.ResourceBundle;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
31 import javax.net.ssl.TrustManager;
32 import javax.net.ssl.TrustManagerFactory;
34 import org.eclipse.jdt.annotation.NonNullByDefault;
35 import org.eclipse.jdt.annotation.Nullable;
36 import org.openhab.core.io.transport.mqtt.MqttActionCallback;
37 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
38 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
39 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
40 import org.openhab.core.io.transport.mqtt.MqttException;
41 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * {@link NhcMqttConnection2} manages the MQTT connection to the Connected Controller. It allows receiving state
47 * information about specific devices and sending updates to specific devices.
49 * @author Mark Herwege - Initial Contribution
52 public class NhcMqttConnection2 implements MqttActionCallback {
54 private final Logger logger = LoggerFactory.getLogger(NhcMqttConnection2.class);
56 private volatile @Nullable MqttBrokerConnection mqttConnection;
58 private volatile @Nullable CompletableFuture<Boolean> subscribedFuture;
59 private volatile @Nullable CompletableFuture<Boolean> stoppedFuture;
61 private MqttMessageSubscriber messageSubscriber;
62 private MqttConnectionObserver connectionObserver;
64 private TrustManager trustManagers[];
65 private String clientId;
67 private volatile String cocoAddress = "";
68 private volatile int port;
69 private volatile String profile = "";
70 private volatile String token = "";
72 NhcMqttConnection2(String clientId, MqttMessageSubscriber messageSubscriber,
73 MqttConnectionObserver connectionObserver) throws CertificateException {
74 trustManagers = getTrustManagers();
75 this.clientId = clientId;
76 this.messageSubscriber = messageSubscriber;
77 this.connectionObserver = connectionObserver;
80 private TrustManager[] getTrustManagers() throws CertificateException {
81 ResourceBundle certificatesBundle = ResourceBundle.getBundle("nikohomecontrol/certificates");
84 // Load server public certificates into key store
85 CertificateFactory cf = CertificateFactory.getInstance("X509");
86 InputStream certificateStream;
87 final KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
88 keyStore.load(null, null);
89 for (String certName : certificatesBundle.keySet()) {
90 certificateStream = new ByteArrayInputStream(
91 certificatesBundle.getString(certName).getBytes(StandardCharsets.UTF_8));
92 X509Certificate certificate = (X509Certificate) cf.generateCertificate(certificateStream);
93 keyStore.setCertificateEntry(certName, certificate);
96 ResourceBundle.clearCache();
98 // Create trust managers used to validate server
99 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
100 tmFactory.init(keyStore);
101 return tmFactory.getTrustManagers();
102 } catch (CertificateException | KeyStoreException | NoSuchAlgorithmException | IOException e) {
103 logger.debug("error with SSL context creation: {} ", e.getMessage());
104 throw new CertificateException("SSL context creation exception", e);
106 ResourceBundle.clearCache();
111 * Start a secure MQTT connection and subscribe to all topics.
113 * @param subscriber MqttMessageSubscriber that will handle received messages
114 * @param cocoAddress IP Address of the Niko Connected Controller
115 * @param port Port for MQTT communication with the Niko Connected Controller
116 * @param token JWT token for the hobby profile
117 * @throws MqttException
119 synchronized void startConnection(String cocoAddress, int port, String profile, String token) throws MqttException {
120 CompletableFuture<Boolean> future = stoppedFuture;
121 if (future != null) {
123 future.get(5000, TimeUnit.MILLISECONDS);
124 logger.debug("finished stopping connection");
125 } catch (InterruptedException | ExecutionException | TimeoutException ignore) {
126 logger.debug("error stopping connection");
128 stoppedFuture = null;
131 logger.debug("starting connection...");
132 this.cocoAddress = cocoAddress;
134 this.profile = profile;
136 MqttBrokerConnection connection = createMqttConnection();
137 connection.addConnectionObserver(connectionObserver);
138 mqttConnection = connection;
140 if (connection.start().get(5000, TimeUnit.MILLISECONDS)) {
141 if (subscribedFuture == null) {
142 subscribedFuture = connection.subscribe("#", messageSubscriber);
145 logger.debug("error connecting");
146 throw new MqttException("Connection execution exception");
148 } catch (InterruptedException e) {
149 logger.debug("connection interrupted exception");
150 throw new MqttException("Connection interrupted exception");
151 } catch (ExecutionException e) {
152 logger.debug("connection execution exception", e.getCause());
153 throw new MqttException("Connection execution exception");
154 } catch (TimeoutException e) {
155 logger.debug("connection timeout exception");
156 throw new MqttException("Connection timeout exception");
160 private MqttBrokerConnection createMqttConnection() throws MqttException {
161 MqttBrokerConnection connection = new MqttBrokerConnection(cocoAddress, port, true, false, clientId);
162 connection.setTrustManagers(trustManagers);
163 connection.setCredentials(profile, token);
164 connection.setQos(1);
169 * Stop the MQTT connection.
171 void stopConnection() {
172 logger.debug("stopping connection...");
173 MqttBrokerConnection connection = mqttConnection;
174 if (connection != null) {
175 connection.removeConnectionObserver(connectionObserver);
177 stoppedFuture = stopConnection(connection);
178 mqttConnection = null;
180 CompletableFuture<Boolean> future = subscribedFuture;
181 if (future != null) {
182 future.complete(false);
183 subscribedFuture = null;
187 private CompletableFuture<Boolean> stopConnection(@Nullable MqttBrokerConnection connection) {
188 if (connection != null) {
189 return connection.stop();
191 return CompletableFuture.completedFuture(true);
196 * @return true if connection established and subscribed to all topics
198 private boolean isConnected() {
199 MqttBrokerConnection connection = mqttConnection;
200 CompletableFuture<Boolean> future = subscribedFuture;
202 if (connection != null) {
204 if ((future != null) && future.get(5000, TimeUnit.MILLISECONDS)) {
205 MqttConnectionState state = connection.connectionState();
206 logger.debug("connection state {} for {}", state, connection.getClientId());
207 return state == MqttConnectionState.CONNECTED;
209 } catch (InterruptedException | ExecutionException | TimeoutException e) {
217 * Publish a message on the general connection.
221 * @throws MqttException
223 void connectionPublish(String topic, String payload) throws MqttException {
224 MqttBrokerConnection connection = mqttConnection;
225 if (connection == null) {
226 logger.debug("cannot publish, no connection");
227 throw new MqttException("No connection exception");
231 logger.debug("publish {}, {}", topic, payload);
232 connection.publish(topic, payload.getBytes(), connection.getQos(), false);
234 logger.debug("cannot publish, not subscribed to connection messages");
239 public void onSuccess(String topic) {
240 logger.debug("publish succeeded {}", topic);
244 public void onFailure(String topic, Throwable error) {
245 logger.debug("publish failed {}, {}", topic, error.getMessage(), error);