]> git.basschouten.com Git - openhab-addons.git/blob
010565551caae443b9cac9028cc2ec3d6cce1605
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.mqttembeddedbroker.internal;
14
15 import java.io.File;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.nio.file.Files;
19 import java.nio.file.Path;
20 import java.nio.file.Paths;
21 import java.security.KeyStore;
22 import java.security.KeyStoreException;
23 import java.security.NoSuchAlgorithmException;
24 import java.security.UnrecoverableKeyException;
25 import java.security.cert.CertificateException;
26 import java.util.Map;
27 import java.util.Properties;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33
34 import javax.net.ssl.KeyManagerFactory;
35
36 import org.eclipse.jdt.annotation.NonNullByDefault;
37 import org.eclipse.jdt.annotation.Nullable;
38 import org.openhab.core.config.core.ConfigConstants;
39 import org.openhab.core.config.core.ConfigurableService;
40 import org.openhab.core.config.core.Configuration;
41 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
42 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
43 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
44 import org.openhab.core.io.transport.mqtt.MqttService;
45 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
46 import org.openhab.io.mqttembeddedbroker.Constants;
47 import org.openhab.io.mqttembeddedbroker.internal.MqttEmbeddedBrokerDetectStart.MqttEmbeddedBrokerStartedListener;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Deactivate;
51 import org.osgi.service.component.annotations.Modified;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import io.moquette.BrokerConstants;
57 import io.moquette.broker.ISslContextCreator;
58 import io.moquette.broker.Server;
59 import io.moquette.broker.config.MemoryConfig;
60 import io.moquette.broker.security.IAuthenticator;
61 import io.moquette.broker.security.IAuthorizatorPolicy;
62 import io.moquette.interception.InterceptHandler;
63 import io.moquette.interception.messages.InterceptAcknowledgedMessage;
64 import io.moquette.interception.messages.InterceptConnectMessage;
65 import io.moquette.interception.messages.InterceptConnectionLostMessage;
66 import io.moquette.interception.messages.InterceptDisconnectMessage;
67 import io.moquette.interception.messages.InterceptPublishMessage;
68 import io.moquette.interception.messages.InterceptSubscribeMessage;
69 import io.moquette.interception.messages.InterceptUnsubscribeMessage;
70 import io.netty.handler.ssl.SslContext;
71 import io.netty.handler.ssl.SslContextBuilder;
72
73 /**
74  * The {@link EmbeddedBrokerService} starts the embedded broker, creates a
75  * {@link MqttBrokerConnection} and adds it to the {@link MqttService}.
76  * <p>
77  * For now tls connections are offered with an accept-all trust manager
78  * and a predefined keystore if "secure" is set to true.
79  *
80  * @author David Graeff - Initial contribution
81  */
82 @Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = "org.openhab.core.mqttembeddedbroker", property = {
83         org.osgi.framework.Constants.SERVICE_PID + "=org.openhab.core.mqttembeddedbroker",
84         ConfigurableService.SERVICE_PROPERTY_DESCRIPTION_URI + "=mqtt:mqttembeddedbroker",
85         ConfigurableService.SERVICE_PROPERTY_CATEGORY + "=MQTT",
86         ConfigurableService.SERVICE_PROPERTY_LABEL + "=MQTT Embedded Broker" })
87 @NonNullByDefault
88 public class EmbeddedBrokerService
89         implements MqttConnectionObserver, MqttServiceObserver, MqttEmbeddedBrokerStartedListener {
90     private final MqttService service;
91     private String persistenceFilename = "";
92     // private NetworkServerTls networkServerTls; //TODO wait for NetworkServerTls implementation
93
94     @NonNullByDefault({})
95     class BrokerMetricsListenerEx implements InterceptHandler {
96
97         @Override
98         public String getID() {
99             return "logger";
100         }
101
102         @Override
103         public Class<?>[] getInterceptedMessageTypes() {
104             return new Class<?>[] { InterceptConnectMessage.class, InterceptDisconnectMessage.class };
105         }
106
107         @Override
108         public void onConnect(InterceptConnectMessage arg0) {
109             logger.debug("MQTT Client connected: {}", arg0.getClientID());
110         }
111
112         @Override
113         public void onConnectionLost(InterceptConnectionLostMessage arg0) {
114         }
115
116         @Override
117         public void onDisconnect(InterceptDisconnectMessage arg0) {
118             logger.debug("MQTT Client disconnected: {}", arg0.getClientID());
119         }
120
121         @Override
122         public void onMessageAcknowledged(InterceptAcknowledgedMessage arg0) {
123         }
124
125         @Override
126         public void onPublish(InterceptPublishMessage arg0) {
127         }
128
129         @Override
130         public void onSubscribe(InterceptSubscribeMessage arg0) {
131         }
132
133         @Override
134         public void onUnsubscribe(InterceptUnsubscribeMessage arg0) {
135         }
136     }
137
138     protected @Nullable Server server;
139     private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerService.class);
140     protected MqttEmbeddedBrokerDetectStart detectStart = new MqttEmbeddedBrokerDetectStart(this);
141     protected BrokerMetricsListenerEx metrics = new BrokerMetricsListenerEx();
142
143     private @Nullable MqttBrokerConnection connection;
144
145     @Activate
146     public EmbeddedBrokerService(@Reference MqttService mqttService, Map<String, Object> configuration)
147             throws IOException {
148         this.service = mqttService;
149         initialize(configuration);
150     }
151
152     @Modified
153     public void modified(Map<String, Object> configuration) throws IOException {
154         deactivate();
155         initialize(configuration);
156     }
157
158     public void initialize(Map<String, Object> configuration) throws IOException {
159         ServiceConfiguration config = new Configuration(configuration).as(ServiceConfiguration.class);
160         int port = config.port == null ? (config.port = config.secure ? 8883 : 1883) : config.port;
161
162         // Create MqttBrokerConnection
163         connection = service.getBrokerConnection(Constants.CLIENTID);
164         if (connection != null) {
165             // Close the existing connection and remove it from the service
166             connection.stop();
167             service.removeBrokerConnection(Constants.CLIENTID);
168         }
169
170         connection = new MqttBrokerConnection("localhost", config.port, config.secure, Constants.CLIENTID);
171         connection.addConnectionObserver(this);
172
173         if (config.username != null) {
174             connection.setCredentials(config.username, config.password);
175         }
176
177         if (!config.persistenceFile.isEmpty()) {
178             final String persistenceFilename = config.persistenceFile;
179             if (!Paths.get(persistenceFilename).isAbsolute()) {
180                 Path path = Paths.get(ConfigConstants.getUserDataFolder()).toAbsolutePath();
181                 Files.createDirectories(path);
182                 this.persistenceFilename = path.resolve(persistenceFilename).toString();
183             }
184
185             logger.info("Broker persistence file: {}", persistenceFilename);
186         } else {
187             logger.info("Using in-memory persistence. No persistence file has been set!");
188         }
189
190         // Start embedded server
191         startEmbeddedServer(port, config.secure, config.username, config.password);
192     }
193
194     @Deactivate
195     public void deactivate() {
196         if (service != null) {
197             service.removeBrokersListener(this);
198         }
199         MqttBrokerConnection connection = this.connection;
200         if (connection == null) {
201             if (server != null) {
202                 server.stopServer();
203             }
204             server = null;
205             return;
206         }
207
208         // Clean shutdown: Stop connection, wait for process to finish, shutdown server
209         connection.removeConnectionObserver(this);
210         try {
211             connection.stop().thenRun(() -> {
212                 if (server != null) {
213                     server.stopServer();
214                     server = null;
215                 }
216             }).get(10, TimeUnit.SECONDS);
217         } catch (InterruptedException | ExecutionException | TimeoutException e) {
218             logger.warn("Could not cleanly shutdown connection or server.", e);
219         }
220         connection = null;
221     }
222
223     @Override
224     public void brokerAdded(String brokerID, MqttBrokerConnection broker) {
225     }
226
227     @SuppressWarnings("null")
228     @Override
229     public void brokerRemoved(String brokerID, MqttBrokerConnection broker) {
230         // Do not allow this connection to be removed. Add it again.
231         if (broker.equals(connection)) {
232             service.addBrokerConnection(brokerID, broker);
233         }
234     }
235
236     /**
237      * For TLS connections we need to setup a keystore and provide Moquette/Netty with an {@link SslContext}.
238      * <p>
239      * If a context is requested by Moquette, this creator
240      * will use the bundled "serverkeystore.keystore" with password "openhab".
241      *
242      * @return An SslContext creator (not be confused with javas SSLContext).
243      */
244     ISslContextCreator nettySSLcontextCreator() {
245         return () -> {
246             try {
247                 InputStream inputStream = getClass().getClassLoader().getResourceAsStream("serverkeystore.keystore");
248                 KeyStore keyStore = KeyStore.getInstance("jks");
249                 keyStore.load(inputStream, "openhab".toCharArray());
250                 KeyManagerFactory factory = KeyManagerFactory.getInstance("SunX509");
251                 factory.init(keyStore, "openhab".toCharArray());
252                 return SslContextBuilder.forServer(factory).build();
253             } catch (NoSuchAlgorithmException | CertificateException | IOException | KeyStoreException
254                     | UnrecoverableKeyException e) {
255                 logger.warn("Failed to create an SSL context");
256                 return null;
257             }
258         };
259     }
260
261     public void startEmbeddedServer(@Nullable Integer portParam, boolean secure, @Nullable String username,
262             @Nullable String password) throws IOException {
263         Server server = new Server();
264         Properties properties = new Properties();
265
266         // Host and port
267         properties.put(BrokerConstants.HOST_PROPERTY_NAME, "0.0.0.0");
268         int port;
269         if (secure) {
270             port = (portParam == null) ? port = 8883 : portParam;
271             properties.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, Integer.toString(port));
272             properties.put(BrokerConstants.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
273             properties.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "esheshesh");
274             properties.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "serverkeystore.jks");
275         } else {
276             port = (portParam == null) ? port = 1883 : portParam;
277             // with SSL_PORT_PROPERTY_NAME set, netty tries to evaluate the SSL context and shuts down immediately.
278             // properties.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
279             properties.put(BrokerConstants.PORT_PROPERTY_NAME, Integer.toString(port));
280         }
281
282         // Authentication
283         IAuthenticator authentificator = null;
284         if (username != null && password != null && username.length() > 0 && password.length() > 0) {
285             properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.FALSE.toString());
286             properties.put(BrokerConstants.AUTHENTICATOR_CLASS_NAME,
287                     MqttEmbeddedBrokerUserAuthenticator.class.getName());
288             authentificator = new MqttEmbeddedBrokerUserAuthenticator(username, password.getBytes());
289             logger.debug("Broker authentication is enabled");
290         } else {
291             properties.put(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString());
292             logger.debug("Broker anonymous access enabled");
293         }
294
295         if (!persistenceFilename.isEmpty()) { // Persistence: If not set, an in-memory database is used.
296             properties.put(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, persistenceFilename);
297             properties.put(BrokerConstants.AUTOSAVE_INTERVAL_PROPERTY_NAME, "30"); // in seconds
298         }
299
300         // We may provide ACL functionality at some point as well
301         IAuthorizatorPolicy authorizer = null;
302         ISslContextCreator sslContextCreator = secure ? nettySSLcontextCreator() : null;
303
304         try {
305             server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
306         } catch (IllegalArgumentException e) {
307             if (e.getMessage().contains("Could not deserialize")) {
308                 Path persistenceFilePath = Paths.get((new File(persistenceFilename)).getAbsolutePath());
309                 logger.warn("persistence corrupt: {}, deleting {}", e.getMessage(), persistenceFilePath);
310                 Files.delete(persistenceFilePath);
311                 // retry starting broker, if it fails again, don't catch exception
312                 server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
313             }
314         }
315         this.server = server;
316         server.addInterceptHandler(metrics);
317         ScheduledExecutorService s = new ScheduledThreadPoolExecutor(1);
318         detectStart.startBrokerStartedDetection(port, s);
319     }
320
321     public void stopEmbeddedServer() {
322         Server server = this.server;
323         if (server != null) {
324             server.removeInterceptHandler(metrics);
325             detectStart.stopBrokerStartDetection();
326             server.stopServer();
327             this.server = null;
328         }
329     }
330
331     /**
332      * For testing: Returns true if the embedded server confirms that the MqttBrokerConnection is connected.
333      */
334     protected boolean serverConfirmsEmbeddedClient() {
335         return server != null && server.listConnectedClients().stream()
336                 .anyMatch(client -> Constants.CLIENTID.equals(client.getClientID()));
337     }
338
339     @Override
340     public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
341         if (state == MqttConnectionState.CONNECTED) {
342             logger.debug("Embedded broker connection connected");
343         } else if (state == MqttConnectionState.CONNECTING) {
344             logger.debug("Embedded broker connection still connecting");
345         } else {
346             if (error == null) {
347                 logger.warn("Embedded broker offline - Reason unknown");
348             } else {
349                 logger.warn("Embedded broker offline", error);
350             }
351         }
352
353         if (state != MqttConnectionState.CONNECTED && state != MqttConnectionState.CONNECTING) {
354             stopEmbeddedServer();
355         }
356     }
357
358     /**
359      * The callback from the detectStart.startBrokerStartedDetection() call within
360      * {@link #startEmbeddedServer(Integer, boolean, String, String, String)}.
361      */
362     @Override
363     public void mqttEmbeddedBrokerStarted(boolean timeout) {
364         MqttBrokerConnection connection = this.connection;
365         MqttService service = this.service;
366         if (connection == null || service == null) {
367             return;
368         }
369         service.addBrokerConnection(Constants.CLIENTID, connection);
370
371         connection.start().exceptionally(e -> {
372             connectionStateChanged(MqttConnectionState.DISCONNECTED, e);
373             return false;
374         }).thenAccept(v -> {
375             if (!v) {
376                 connectionStateChanged(MqttConnectionState.DISCONNECTED, new TimeoutException("Timeout"));
377             }
378         });
379     }
380
381     public @Nullable MqttBrokerConnection getConnection() {
382         return connection;
383     }
384
385     public String getPersistenceFilename() {
386         return persistenceFilename;
387     }
388
389     public void setPersistenceFilename(String persistenceFilename) {
390         this.persistenceFilename = persistenceFilename;
391     }
392 }