2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.io.mqttembeddedbroker.internal;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.nio.file.Files;
19 import java.nio.file.Path;
20 import java.nio.file.Paths;
21 import java.security.KeyStore;
22 import java.security.KeyStoreException;
23 import java.security.NoSuchAlgorithmException;
24 import java.security.UnrecoverableKeyException;
25 import java.security.cert.CertificateException;
27 import java.util.Properties;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
34 import javax.net.ssl.KeyManagerFactory;
36 import org.eclipse.jdt.annotation.NonNullByDefault;
37 import org.eclipse.jdt.annotation.Nullable;
38 import org.openhab.core.OpenHAB;
39 import org.openhab.core.config.core.ConfigurableService;
40 import org.openhab.core.config.core.Configuration;
41 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
42 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
43 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
44 import org.openhab.core.io.transport.mqtt.MqttService;
45 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
46 import org.openhab.io.mqttembeddedbroker.Constants;
47 import org.openhab.io.mqttembeddedbroker.internal.MqttEmbeddedBrokerDetectStart.MqttEmbeddedBrokerStartedListener;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Deactivate;
51 import org.osgi.service.component.annotations.Modified;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
56 import io.moquette.BrokerConstants;
57 import io.moquette.broker.ISslContextCreator;
58 import io.moquette.broker.Server;
59 import io.moquette.broker.config.MemoryConfig;
60 import io.moquette.broker.security.IAuthenticator;
61 import io.moquette.broker.security.IAuthorizatorPolicy;
62 import io.moquette.interception.InterceptHandler;
63 import io.moquette.interception.messages.InterceptAcknowledgedMessage;
64 import io.moquette.interception.messages.InterceptConnectMessage;
65 import io.moquette.interception.messages.InterceptConnectionLostMessage;
66 import io.moquette.interception.messages.InterceptDisconnectMessage;
67 import io.moquette.interception.messages.InterceptPublishMessage;
68 import io.moquette.interception.messages.InterceptSubscribeMessage;
69 import io.moquette.interception.messages.InterceptUnsubscribeMessage;
70 import io.netty.handler.ssl.SslContext;
71 import io.netty.handler.ssl.SslContextBuilder;
74 * The {@link EmbeddedBrokerService} starts the embedded broker, creates a
75 * {@link MqttBrokerConnection} and adds it to the {@link MqttService}.
77 * For now tls connections are offered with an accept-all trust manager
78 * and a predefined keystore if "secure" is set to true.
80 * @author David Graeff - Initial contribution
82 @Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = Constants.PID, //
83 property = org.osgi.framework.Constants.SERVICE_PID + "=" + Constants.PID)
84 @ConfigurableService(category = "MQTT", label = "MQTT Embedded Broker", description_uri = "mqtt:mqttembeddedbroker")
86 public class EmbeddedBrokerService
87 implements MqttConnectionObserver, MqttServiceObserver, MqttEmbeddedBrokerStartedListener {
88 private final MqttService service;
89 private String persistenceFilename = "";
90 // private NetworkServerTls networkServerTls; //TODO wait for NetworkServerTls implementation
93 class BrokerMetricsListenerEx implements InterceptHandler {
96 public String getID() {
101 public Class<?>[] getInterceptedMessageTypes() {
102 return new Class<?>[] { InterceptConnectMessage.class, InterceptDisconnectMessage.class };
106 public void onConnect(InterceptConnectMessage arg0) {
107 logger.debug("MQTT Client connected: {}", arg0.getClientID());
111 public void onConnectionLost(InterceptConnectionLostMessage arg0) {
115 public void onDisconnect(InterceptDisconnectMessage arg0) {
116 logger.debug("MQTT Client disconnected: {}", arg0.getClientID());
120 public void onMessageAcknowledged(InterceptAcknowledgedMessage arg0) {
124 public void onPublish(InterceptPublishMessage arg0) {
128 public void onSubscribe(InterceptSubscribeMessage arg0) {
132 public void onUnsubscribe(InterceptUnsubscribeMessage arg0) {
136 protected @Nullable Server server;
137 private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerService.class);
138 protected MqttEmbeddedBrokerDetectStart detectStart = new MqttEmbeddedBrokerDetectStart(this);
139 protected BrokerMetricsListenerEx metrics = new BrokerMetricsListenerEx();
141 private @Nullable MqttBrokerConnection connection;
144 public EmbeddedBrokerService(@Reference MqttService mqttService, Map<String, Object> configuration)
146 this.service = mqttService;
147 initialize(configuration);
151 public void modified(Map<String, Object> configuration) throws IOException {
153 initialize(configuration);
156 public void initialize(Map<String, Object> configuration) throws IOException {
157 ServiceConfiguration config = new Configuration(configuration).as(ServiceConfiguration.class);
158 int port = config.port == null ? (config.port = config.secure ? 8883 : 1883) : config.port;
160 // Create MqttBrokerConnection
161 connection = service.getBrokerConnection(Constants.CLIENTID);
162 if (connection != null) {
163 // Close the existing connection and remove it from the service
165 service.removeBrokerConnection(Constants.CLIENTID);
168 connection = new MqttBrokerConnection("localhost", config.port, config.secure, Constants.CLIENTID);
169 connection.addConnectionObserver(this);
171 if (config.username != null) {
172 connection.setCredentials(config.username, config.password);
175 if (!config.persistenceFile.isEmpty()) {
176 final String persistenceFilename = config.persistenceFile;
177 if (!Paths.get(persistenceFilename).isAbsolute()) {
178 Path path = Paths.get(OpenHAB.getUserDataFolder()).toAbsolutePath();
179 Files.createDirectories(path);
180 this.persistenceFilename = path.resolve(persistenceFilename).toString();
183 logger.info("Broker persistence file: {}", persistenceFilename);
185 logger.info("Using in-memory persistence. No persistence file has been set!");
188 // Start embedded server
189 startEmbeddedServer(port, config.secure, config.username, config.password);
193 public void deactivate() {
194 if (service != null) {
195 service.removeBrokersListener(this);
197 MqttBrokerConnection connection = this.connection;
198 if (connection == null) {
199 if (server != null) {
206 // Clean shutdown: Stop connection, wait for process to finish, shutdown server
207 connection.removeConnectionObserver(this);
209 connection.stop().thenRun(() -> {
210 if (server != null) {
214 }).get(10, TimeUnit.SECONDS);
215 } catch (InterruptedException | ExecutionException | TimeoutException e) {
216 logger.warn("Could not cleanly shutdown connection or server.", e);
222 public void brokerAdded(String brokerID, MqttBrokerConnection broker) {
225 @SuppressWarnings("null")
227 public void brokerRemoved(String brokerID, MqttBrokerConnection broker) {
228 // Do not allow this connection to be removed. Add it again.
229 if (broker.equals(connection)) {
230 service.addBrokerConnection(brokerID, broker);
235 * For TLS connections we need to setup a keystore and provide Moquette/Netty with an {@link SslContext}.
237 * If a context is requested by Moquette, this creator
238 * will use the bundled "serverkeystore.keystore" with password "openhab".
240 * @return An SslContext creator (not be confused with javas SSLContext).
242 ISslContextCreator nettySSLcontextCreator() {
245 InputStream inputStream = getClass().getClassLoader().getResourceAsStream("serverkeystore.keystore");
246 KeyStore keyStore = KeyStore.getInstance("jks");
247 keyStore.load(inputStream, "openhab".toCharArray());
248 KeyManagerFactory factory = KeyManagerFactory.getInstance("SunX509");
249 factory.init(keyStore, "openhab".toCharArray());
250 return SslContextBuilder.forServer(factory).build();
251 } catch (NoSuchAlgorithmException | CertificateException | IOException | KeyStoreException
252 | UnrecoverableKeyException e) {
253 logger.warn("Failed to create an SSL context");
259 public void startEmbeddedServer(@Nullable Integer portParam, boolean secure, @Nullable String username,
260 @Nullable String password) throws IOException {
261 Server server = new Server();
262 Properties properties = new Properties();
265 properties.put(BrokerConstants.HOST_PROPERTY_NAME, "0.0.0.0");
268 port = (portParam == null) ? port = 8883 : portParam;
269 properties.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, Integer.toString(port));
270 properties.put(BrokerConstants.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
271 properties.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "esheshesh");
272 properties.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "serverkeystore.jks");
274 port = (portParam == null) ? port = 1883 : portParam;
275 // with SSL_PORT_PROPERTY_NAME set, netty tries to evaluate the SSL context and shuts down immediately.
276 // properties.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
277 properties.put(BrokerConstants.PORT_PROPERTY_NAME, Integer.toString(port));
281 IAuthenticator authentificator = null;
282 if (username != null && password != null && username.length() > 0 && password.length() > 0) {
283 properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.FALSE.toString());
284 properties.put(BrokerConstants.AUTHENTICATOR_CLASS_NAME,
285 MqttEmbeddedBrokerUserAuthenticator.class.getName());
286 authentificator = new MqttEmbeddedBrokerUserAuthenticator(username, password.getBytes());
287 logger.debug("Broker authentication is enabled");
289 properties.put(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString());
290 logger.debug("Broker anonymous access enabled");
293 if (!persistenceFilename.isEmpty()) { // Persistence: If not set, an in-memory database is used.
294 properties.put(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, persistenceFilename);
295 properties.put(BrokerConstants.AUTOSAVE_INTERVAL_PROPERTY_NAME, "30"); // in seconds
298 // We may provide ACL functionality at some point as well
299 IAuthorizatorPolicy authorizer = null;
300 ISslContextCreator sslContextCreator = secure ? nettySSLcontextCreator() : null;
303 server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
304 } catch (IllegalArgumentException e) {
305 if (e.getMessage().contains("Could not deserialize")) {
306 Path persistenceFilePath = Paths.get((new File(persistenceFilename)).getAbsolutePath());
307 logger.warn("persistence corrupt: {}, deleting {}", e.getMessage(), persistenceFilePath);
308 Files.delete(persistenceFilePath);
309 // retry starting broker, if it fails again, don't catch exception
310 server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
312 } catch (Exception e) {
313 logger.warn("Failed to start embedded MQTT server: {}", e.getMessage());
318 this.server = server;
319 server.addInterceptHandler(metrics);
320 ScheduledExecutorService s = new ScheduledThreadPoolExecutor(1);
321 detectStart.startBrokerStartedDetection(port, s);
324 public void stopEmbeddedServer() {
325 Server server = this.server;
326 if (server != null) {
327 server.removeInterceptHandler(metrics);
328 detectStart.stopBrokerStartDetection();
335 * For testing: Returns true if the embedded server confirms that the MqttBrokerConnection is connected.
337 protected boolean serverConfirmsEmbeddedClient() {
338 return server != null && server.listConnectedClients().stream()
339 .anyMatch(client -> Constants.CLIENTID.equals(client.getClientID()));
343 public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
344 if (state == MqttConnectionState.CONNECTED) {
345 logger.debug("Embedded broker connection connected");
346 } else if (state == MqttConnectionState.CONNECTING) {
347 logger.debug("Embedded broker connection still connecting");
350 logger.warn("Embedded broker offline - Reason unknown");
352 logger.warn("Embedded broker offline", error);
356 if (state != MqttConnectionState.CONNECTED && state != MqttConnectionState.CONNECTING) {
357 stopEmbeddedServer();
362 * The callback from the detectStart.startBrokerStartedDetection() call within
363 * {@link #startEmbeddedServer(Integer, boolean, String, String, String)}.
366 public void mqttEmbeddedBrokerStarted(boolean timeout) {
367 MqttBrokerConnection connection = this.connection;
368 MqttService service = this.service;
369 if (connection == null || service == null) {
372 service.addBrokerConnection(Constants.CLIENTID, connection);
374 connection.start().exceptionally(e -> {
375 connectionStateChanged(MqttConnectionState.DISCONNECTED, e);
379 connectionStateChanged(MqttConnectionState.DISCONNECTED, new TimeoutException("Timeout"));
384 public @Nullable MqttBrokerConnection getConnection() {
388 public String getPersistenceFilename() {
389 return persistenceFilename;
392 public void setPersistenceFilename(String persistenceFilename) {
393 this.persistenceFilename = persistenceFilename;