2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.nikohomecontrol.internal.protocol.nhc2;
15 import java.io.ByteArrayInputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.nio.charset.StandardCharsets;
19 import java.security.KeyStore;
20 import java.security.KeyStoreException;
21 import java.security.NoSuchAlgorithmException;
22 import java.security.cert.CertificateException;
23 import java.security.cert.CertificateFactory;
24 import java.security.cert.X509Certificate;
25 import java.util.ResourceBundle;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
31 import javax.net.ssl.TrustManager;
32 import javax.net.ssl.TrustManagerFactory;
34 import org.eclipse.jdt.annotation.NonNullByDefault;
35 import org.eclipse.jdt.annotation.Nullable;
36 import org.openhab.core.io.transport.mqtt.MqttActionCallback;
37 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
38 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
39 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
40 import org.openhab.core.io.transport.mqtt.MqttException;
41 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
42 import org.openhab.core.io.transport.mqtt.reconnect.AbstractReconnectStrategy;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * {@link NhcMqttConnection2} manages the MQTT connection to the Connected Controller. It allows receiving state
48 * information about specific devices and sending updates to specific devices.
50 * @author Mark Herwege - Initial Contribution
53 public class NhcMqttConnection2 implements MqttActionCallback {
55 private final Logger logger = LoggerFactory.getLogger(NhcMqttConnection2.class);
57 private volatile @Nullable MqttBrokerConnection mqttConnection;
59 private volatile @Nullable CompletableFuture<Boolean> subscribedFuture;
60 private volatile @Nullable CompletableFuture<Boolean> stoppedFuture;
62 private final MqttMessageSubscriber messageSubscriber;
63 private final MqttConnectionObserver connectionObserver;
65 private TrustManager[] trustManagers;
66 private String clientId;
68 private volatile String cocoAddress = "";
69 private volatile int port;
70 private volatile String profile = "";
71 private volatile String token = "";
73 NhcMqttConnection2(String clientId, MqttMessageSubscriber messageSubscriber,
74 MqttConnectionObserver connectionObserver) throws CertificateException {
75 trustManagers = getTrustManagers();
76 this.clientId = clientId;
77 this.messageSubscriber = messageSubscriber;
78 this.connectionObserver = connectionObserver;
81 private TrustManager[] getTrustManagers() throws CertificateException {
82 ResourceBundle certificatesBundle = ResourceBundle.getBundle("nikohomecontrol/certificates");
85 // Load server public certificates into key store
86 CertificateFactory cf = CertificateFactory.getInstance("X509");
87 InputStream certificateStream;
88 final KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
89 keyStore.load(null, null);
90 for (String certName : certificatesBundle.keySet()) {
91 certificateStream = new ByteArrayInputStream(
92 certificatesBundle.getString(certName).getBytes(StandardCharsets.UTF_8));
93 X509Certificate certificate = (X509Certificate) cf.generateCertificate(certificateStream);
94 keyStore.setCertificateEntry(certName, certificate);
97 ResourceBundle.clearCache();
99 // Create trust managers used to validate server
100 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
101 tmFactory.init(keyStore);
102 return tmFactory.getTrustManagers();
103 } catch (CertificateException | KeyStoreException | NoSuchAlgorithmException | IOException e) {
104 logger.debug("error with SSL context creation: {} ", e.getMessage());
105 throw new CertificateException("SSL context creation exception", e);
107 ResourceBundle.clearCache();
112 * Start a secure MQTT connection and subscribe to all topics.
114 * @param subscriber MqttMessageSubscriber that will handle received messages
115 * @param cocoAddress IP Address of the Niko Connected Controller
116 * @param port Port for MQTT communication with the Niko Connected Controller
117 * @param token JWT token for the hobby profile
118 * @throws MqttException
120 synchronized void startConnection(String cocoAddress, int port, String profile, String token) throws MqttException {
121 CompletableFuture<Boolean> future = stoppedFuture;
122 if (future != null) {
124 future.get(5000, TimeUnit.MILLISECONDS);
125 logger.debug("finished stopping connection");
126 } catch (InterruptedException | ExecutionException | TimeoutException ignore) {
127 logger.debug("error stopping connection");
129 stoppedFuture = null;
132 logger.debug("starting connection...");
133 this.cocoAddress = cocoAddress;
135 this.profile = profile;
137 MqttBrokerConnection connection = createMqttConnection();
138 connection.addConnectionObserver(connectionObserver);
139 mqttConnection = connection;
141 if (connection.start().get(5000, TimeUnit.MILLISECONDS)) {
142 if (subscribedFuture == null) {
143 subscribedFuture = connection.subscribe("#", messageSubscriber);
146 logger.debug("error connecting");
147 throw new MqttException("Connection execution exception");
149 } catch (InterruptedException e) {
150 logger.debug("connection interrupted exception");
151 throw new MqttException("Connection interrupted exception");
152 } catch (ExecutionException e) {
153 logger.debug("connection execution exception", e.getCause());
154 throw new MqttException("Connection execution exception");
155 } catch (TimeoutException e) {
156 logger.debug("connection timeout exception");
157 throw new MqttException("Connection timeout exception");
161 private MqttBrokerConnection createMqttConnection() throws MqttException {
162 MqttBrokerConnection connection = new MqttBrokerConnection(cocoAddress, port, true, false, clientId);
163 connection.setTrustManagers(trustManagers);
164 connection.setCredentials(profile, token);
165 connection.setQos(1);
167 // Don't use the transport periodic reconnect strategy. It doesn't restart the initialization when the
168 // connection is lost and creates extra threads that do not get cleaned up. Just stop it.
169 AbstractReconnectStrategy reconnectStrategy = connection.getReconnectStrategy();
170 if (reconnectStrategy != null) {
171 reconnectStrategy.stop();
177 * Stop the MQTT connection.
179 void stopConnection() {
180 logger.debug("stopping connection...");
181 MqttBrokerConnection connection = mqttConnection;
182 if (connection != null) {
183 connection.removeConnectionObserver(connectionObserver);
185 stoppedFuture = stopConnection(connection);
186 mqttConnection = null;
188 CompletableFuture<Boolean> future = subscribedFuture;
189 if (future != null) {
190 future.complete(false);
191 subscribedFuture = null;
195 private CompletableFuture<Boolean> stopConnection(@Nullable MqttBrokerConnection connection) {
196 if (connection != null) {
197 return connection.stop();
199 return CompletableFuture.completedFuture(true);
204 * @return true if connection established and subscribed to all topics
206 private boolean isConnected() {
207 MqttBrokerConnection connection = mqttConnection;
208 CompletableFuture<Boolean> future = subscribedFuture;
210 if (connection != null) {
212 if ((future != null) && future.get(5000, TimeUnit.MILLISECONDS)) {
213 MqttConnectionState state = connection.connectionState();
214 logger.debug("connection state {} for {}", state, connection.getClientId());
215 return state == MqttConnectionState.CONNECTED;
217 } catch (InterruptedException | ExecutionException | TimeoutException e) {
225 * Publish a message on the general connection.
229 * @throws MqttException
231 void connectionPublish(String topic, String payload) throws MqttException {
232 MqttBrokerConnection connection = mqttConnection;
233 if (connection == null) {
234 logger.debug("cannot publish, no connection");
235 throw new MqttException("No connection exception");
239 logger.debug("publish {}, {}", topic, payload);
240 connection.publish(topic, payload.getBytes(), connection.getQos(), false);
242 logger.debug("cannot publish, not subscribed to connection messages");
247 public void onSuccess(String topic) {
248 logger.debug("publish succeeded {}", topic);
252 public void onFailure(String topic, Throwable error) {
253 logger.debug("publish failed {}, {}", topic, error.getMessage(), error);