2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.io.mqttembeddedbroker.internal;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.nio.file.Files;
19 import java.nio.file.Path;
20 import java.nio.file.Paths;
21 import java.security.KeyStore;
22 import java.security.KeyStoreException;
23 import java.security.NoSuchAlgorithmException;
24 import java.security.UnrecoverableKeyException;
25 import java.security.cert.CertificateException;
27 import java.util.Properties;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
34 import javax.net.ssl.KeyManagerFactory;
36 import org.eclipse.jdt.annotation.NonNullByDefault;
37 import org.eclipse.jdt.annotation.Nullable;
38 import org.openhab.core.config.core.ConfigConstants;
39 import org.openhab.core.config.core.ConfigurableService;
40 import org.openhab.core.config.core.Configuration;
41 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
42 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
43 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
44 import org.openhab.core.io.transport.mqtt.MqttService;
45 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
46 import org.openhab.io.mqttembeddedbroker.Constants;
47 import org.openhab.io.mqttembeddedbroker.internal.MqttEmbeddedBrokerDetectStart.MqttEmbeddedBrokerStartedListener;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Deactivate;
51 import org.osgi.service.component.annotations.Modified;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
56 import io.moquette.BrokerConstants;
57 import io.moquette.broker.ISslContextCreator;
58 import io.moquette.broker.Server;
59 import io.moquette.broker.config.MemoryConfig;
60 import io.moquette.broker.security.IAuthenticator;
61 import io.moquette.broker.security.IAuthorizatorPolicy;
62 import io.moquette.interception.InterceptHandler;
63 import io.moquette.interception.messages.InterceptAcknowledgedMessage;
64 import io.moquette.interception.messages.InterceptConnectMessage;
65 import io.moquette.interception.messages.InterceptConnectionLostMessage;
66 import io.moquette.interception.messages.InterceptDisconnectMessage;
67 import io.moquette.interception.messages.InterceptPublishMessage;
68 import io.moquette.interception.messages.InterceptSubscribeMessage;
69 import io.moquette.interception.messages.InterceptUnsubscribeMessage;
70 import io.netty.handler.ssl.SslContext;
71 import io.netty.handler.ssl.SslContextBuilder;
74 * The {@link EmbeddedBrokerService} starts the embedded broker, creates a
75 * {@link MqttBrokerConnection} and adds it to the {@link MqttService}.
77 * For now tls connections are offered with an accept-all trust manager
78 * and a predefined keystore if "secure" is set to true.
80 * @author David Graeff - Initial contribution
82 @Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = "org.openhab.core.mqttembeddedbroker", property = {
83 org.osgi.framework.Constants.SERVICE_PID + "=org.openhab.core.mqttembeddedbroker",
84 ConfigurableService.SERVICE_PROPERTY_DESCRIPTION_URI + "=mqtt:mqttembeddedbroker",
85 ConfigurableService.SERVICE_PROPERTY_CATEGORY + "=MQTT",
86 ConfigurableService.SERVICE_PROPERTY_LABEL + "=MQTT Embedded Broker" })
88 public class EmbeddedBrokerService
89 implements MqttConnectionObserver, MqttServiceObserver, MqttEmbeddedBrokerStartedListener {
90 private final MqttService service;
91 private String persistenceFilename = "";
92 // private NetworkServerTls networkServerTls; //TODO wait for NetworkServerTls implementation
95 class BrokerMetricsListenerEx implements InterceptHandler {
98 public String getID() {
103 public Class<?>[] getInterceptedMessageTypes() {
104 return new Class<?>[] { InterceptConnectMessage.class, InterceptDisconnectMessage.class };
108 public void onConnect(InterceptConnectMessage arg0) {
109 logger.debug("MQTT Client connected: {}", arg0.getClientID());
113 public void onConnectionLost(InterceptConnectionLostMessage arg0) {
117 public void onDisconnect(InterceptDisconnectMessage arg0) {
118 logger.debug("MQTT Client disconnected: {}", arg0.getClientID());
122 public void onMessageAcknowledged(InterceptAcknowledgedMessage arg0) {
126 public void onPublish(InterceptPublishMessage arg0) {
130 public void onSubscribe(InterceptSubscribeMessage arg0) {
134 public void onUnsubscribe(InterceptUnsubscribeMessage arg0) {
138 protected @Nullable Server server;
139 private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerService.class);
140 protected MqttEmbeddedBrokerDetectStart detectStart = new MqttEmbeddedBrokerDetectStart(this);
141 protected BrokerMetricsListenerEx metrics = new BrokerMetricsListenerEx();
143 private @Nullable MqttBrokerConnection connection;
146 public EmbeddedBrokerService(@Reference MqttService mqttService, Map<String, Object> configuration)
148 this.service = mqttService;
149 initialize(configuration);
153 public void modified(Map<String, Object> configuration) throws IOException {
155 initialize(configuration);
158 public void initialize(Map<String, Object> configuration) throws IOException {
159 ServiceConfiguration config = new Configuration(configuration).as(ServiceConfiguration.class);
160 int port = config.port == null ? (config.port = config.secure ? 8883 : 1883) : config.port;
162 // Create MqttBrokerConnection
163 connection = service.getBrokerConnection(Constants.CLIENTID);
164 if (connection != null) {
165 // Close the existing connection and remove it from the service
167 service.removeBrokerConnection(Constants.CLIENTID);
170 connection = new MqttBrokerConnection("localhost", config.port, config.secure, Constants.CLIENTID);
171 connection.addConnectionObserver(this);
173 if (config.username != null) {
174 connection.setCredentials(config.username, config.password);
177 if (!config.persistenceFile.isEmpty()) {
178 final String persistenceFilename = config.persistenceFile;
179 if (!Paths.get(persistenceFilename).isAbsolute()) {
180 Path path = Paths.get(ConfigConstants.getUserDataFolder()).toAbsolutePath();
181 Files.createDirectories(path);
182 this.persistenceFilename = path.resolve(persistenceFilename).toString();
185 logger.info("Broker persistence file: {}", persistenceFilename);
187 logger.info("Using in-memory persistence. No persistence file has been set!");
190 // Start embedded server
191 startEmbeddedServer(port, config.secure, config.username, config.password);
195 public void deactivate() {
196 if (service != null) {
197 service.removeBrokersListener(this);
199 MqttBrokerConnection connection = this.connection;
200 if (connection == null) {
201 if (server != null) {
208 // Clean shutdown: Stop connection, wait for process to finish, shutdown server
209 connection.removeConnectionObserver(this);
211 connection.stop().thenRun(() -> {
212 if (server != null) {
216 }).get(10, TimeUnit.SECONDS);
217 } catch (InterruptedException | ExecutionException | TimeoutException e) {
218 logger.warn("Could not cleanly shutdown connection or server.", e);
224 public void brokerAdded(String brokerID, MqttBrokerConnection broker) {
227 @SuppressWarnings("null")
229 public void brokerRemoved(String brokerID, MqttBrokerConnection broker) {
230 // Do not allow this connection to be removed. Add it again.
231 if (broker.equals(connection)) {
232 service.addBrokerConnection(brokerID, broker);
237 * For TLS connections we need to setup a keystore and provide Moquette/Netty with an {@link SslContext}.
239 * If a context is requested by Moquette, this creator
240 * will use the bundled "serverkeystore.keystore" with password "openhab".
242 * @return An SslContext creator (not be confused with javas SSLContext).
244 ISslContextCreator nettySSLcontextCreator() {
247 InputStream inputStream = getClass().getClassLoader().getResourceAsStream("serverkeystore.keystore");
248 KeyStore keyStore = KeyStore.getInstance("jks");
249 keyStore.load(inputStream, "openhab".toCharArray());
250 KeyManagerFactory factory = KeyManagerFactory.getInstance("SunX509");
251 factory.init(keyStore, "openhab".toCharArray());
252 return SslContextBuilder.forServer(factory).build();
253 } catch (NoSuchAlgorithmException | CertificateException | IOException | KeyStoreException
254 | UnrecoverableKeyException e) {
255 logger.warn("Failed to create an SSL context");
261 public void startEmbeddedServer(@Nullable Integer portParam, boolean secure, @Nullable String username,
262 @Nullable String password) throws IOException {
263 Server server = new Server();
264 Properties properties = new Properties();
267 properties.put(BrokerConstants.HOST_PROPERTY_NAME, "0.0.0.0");
270 port = (portParam == null) ? port = 8883 : portParam;
271 properties.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, Integer.toString(port));
272 properties.put(BrokerConstants.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
273 properties.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "esheshesh");
274 properties.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "serverkeystore.jks");
276 port = (portParam == null) ? port = 1883 : portParam;
277 // with SSL_PORT_PROPERTY_NAME set, netty tries to evaluate the SSL context and shuts down immediately.
278 // properties.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
279 properties.put(BrokerConstants.PORT_PROPERTY_NAME, Integer.toString(port));
283 IAuthenticator authentificator = null;
284 if (username != null && password != null && username.length() > 0 && password.length() > 0) {
285 properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.FALSE.toString());
286 properties.put(BrokerConstants.AUTHENTICATOR_CLASS_NAME,
287 MqttEmbeddedBrokerUserAuthenticator.class.getName());
288 authentificator = new MqttEmbeddedBrokerUserAuthenticator(username, password.getBytes());
289 logger.debug("Broker authentication is enabled");
291 properties.put(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString());
292 logger.debug("Broker anonymous access enabled");
295 if (!persistenceFilename.isEmpty()) { // Persistence: If not set, an in-memory database is used.
296 properties.put(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, persistenceFilename);
297 properties.put(BrokerConstants.AUTOSAVE_INTERVAL_PROPERTY_NAME, "30"); // in seconds
300 // We may provide ACL functionality at some point as well
301 IAuthorizatorPolicy authorizer = null;
302 ISslContextCreator sslContextCreator = secure ? nettySSLcontextCreator() : null;
305 server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
306 } catch (IllegalArgumentException e) {
307 if (e.getMessage().contains("Could not deserialize")) {
308 Path persistenceFilePath = Paths.get((new File(persistenceFilename)).getAbsolutePath());
309 logger.warn("persistence corrupt: {}, deleting {}", e.getMessage(), persistenceFilePath);
310 Files.delete(persistenceFilePath);
311 // retry starting broker, if it fails again, don't catch exception
312 server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
315 this.server = server;
316 server.addInterceptHandler(metrics);
317 ScheduledExecutorService s = new ScheduledThreadPoolExecutor(1);
318 detectStart.startBrokerStartedDetection(port, s);
321 public void stopEmbeddedServer() {
322 Server server = this.server;
323 if (server != null) {
324 server.removeInterceptHandler(metrics);
325 detectStart.stopBrokerStartDetection();
332 * For testing: Returns true if the embedded server confirms that the MqttBrokerConnection is connected.
334 protected boolean serverConfirmsEmbeddedClient() {
335 return server != null && server.listConnectedClients().stream()
336 .anyMatch(client -> Constants.CLIENTID.equals(client.getClientID()));
340 public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
341 if (state == MqttConnectionState.CONNECTED) {
342 logger.debug("Embedded broker connection connected");
343 } else if (state == MqttConnectionState.CONNECTING) {
344 logger.debug("Embedded broker connection still connecting");
347 logger.warn("Embedded broker offline - Reason unknown");
349 logger.warn("Embedded broker offline", error);
353 if (state != MqttConnectionState.CONNECTED && state != MqttConnectionState.CONNECTING) {
354 stopEmbeddedServer();
359 * The callback from the detectStart.startBrokerStartedDetection() call within
360 * {@link #startEmbeddedServer(Integer, boolean, String, String, String)}.
363 public void mqttEmbeddedBrokerStarted(boolean timeout) {
364 MqttBrokerConnection connection = this.connection;
365 MqttService service = this.service;
366 if (connection == null || service == null) {
369 service.addBrokerConnection(Constants.CLIENTID, connection);
371 connection.start().exceptionally(e -> {
372 connectionStateChanged(MqttConnectionState.DISCONNECTED, e);
376 connectionStateChanged(MqttConnectionState.DISCONNECTED, new TimeoutException("Timeout"));
381 public @Nullable MqttBrokerConnection getConnection() {
385 public String getPersistenceFilename() {
386 return persistenceFilename;
389 public void setPersistenceFilename(String persistenceFilename) {
390 this.persistenceFilename = persistenceFilename;