]> git.basschouten.com Git - openhab-addons.git/blob
a87cada201505a6c64b059c5d56b603c37505253
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.mqttembeddedbroker.internal;
14
15 import java.io.File;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.nio.file.Files;
19 import java.nio.file.Path;
20 import java.nio.file.Paths;
21 import java.security.KeyStore;
22 import java.security.KeyStoreException;
23 import java.security.NoSuchAlgorithmException;
24 import java.security.UnrecoverableKeyException;
25 import java.security.cert.CertificateException;
26 import java.util.Map;
27 import java.util.Properties;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33
34 import javax.net.ssl.KeyManagerFactory;
35
36 import org.eclipse.jdt.annotation.NonNullByDefault;
37 import org.eclipse.jdt.annotation.Nullable;
38 import org.openhab.core.OpenHAB;
39 import org.openhab.core.config.core.ConfigurableService;
40 import org.openhab.core.config.core.Configuration;
41 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
42 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
43 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
44 import org.openhab.core.io.transport.mqtt.MqttService;
45 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
46 import org.openhab.io.mqttembeddedbroker.Constants;
47 import org.openhab.io.mqttembeddedbroker.internal.MqttEmbeddedBrokerDetectStart.MqttEmbeddedBrokerStartedListener;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Deactivate;
51 import org.osgi.service.component.annotations.Modified;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import io.moquette.BrokerConstants;
57 import io.moquette.broker.ISslContextCreator;
58 import io.moquette.broker.Server;
59 import io.moquette.broker.config.MemoryConfig;
60 import io.moquette.broker.security.IAuthenticator;
61 import io.moquette.broker.security.IAuthorizatorPolicy;
62 import io.moquette.interception.InterceptHandler;
63 import io.moquette.interception.messages.InterceptAcknowledgedMessage;
64 import io.moquette.interception.messages.InterceptConnectMessage;
65 import io.moquette.interception.messages.InterceptConnectionLostMessage;
66 import io.moquette.interception.messages.InterceptDisconnectMessage;
67 import io.moquette.interception.messages.InterceptPublishMessage;
68 import io.moquette.interception.messages.InterceptSubscribeMessage;
69 import io.moquette.interception.messages.InterceptUnsubscribeMessage;
70 import io.netty.handler.ssl.SslContext;
71 import io.netty.handler.ssl.SslContextBuilder;
72
73 /**
74  * The {@link EmbeddedBrokerService} starts the embedded broker, creates a
75  * {@link MqttBrokerConnection} and adds it to the {@link MqttService}.
76  * <p>
77  * For now tls connections are offered with an accept-all trust manager
78  * and a predefined keystore if "secure" is set to true.
79  *
80  * @author David Graeff - Initial contribution
81  */
82 @Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = Constants.PID, //
83         property = org.osgi.framework.Constants.SERVICE_PID + "=" + Constants.PID)
84 @ConfigurableService(category = "MQTT", label = "MQTT Embedded Broker", description_uri = "mqtt:mqttembeddedbroker")
85 @NonNullByDefault
86 public class EmbeddedBrokerService
87         implements MqttConnectionObserver, MqttServiceObserver, MqttEmbeddedBrokerStartedListener {
88     private final MqttService service;
89     private String persistenceFilename = "";
90     // private NetworkServerTls networkServerTls; //TODO wait for NetworkServerTls implementation
91
92     @NonNullByDefault({})
93     class BrokerMetricsListenerEx implements InterceptHandler {
94
95         @Override
96         public String getID() {
97             return "logger";
98         }
99
100         @Override
101         public Class<?>[] getInterceptedMessageTypes() {
102             return new Class<?>[] { InterceptConnectMessage.class, InterceptDisconnectMessage.class };
103         }
104
105         @Override
106         public void onConnect(InterceptConnectMessage arg0) {
107             logger.debug("MQTT Client connected: {}", arg0.getClientID());
108         }
109
110         @Override
111         public void onConnectionLost(InterceptConnectionLostMessage arg0) {
112         }
113
114         @Override
115         public void onDisconnect(InterceptDisconnectMessage arg0) {
116             logger.debug("MQTT Client disconnected: {}", arg0.getClientID());
117         }
118
119         @Override
120         public void onMessageAcknowledged(InterceptAcknowledgedMessage arg0) {
121         }
122
123         @Override
124         public void onPublish(InterceptPublishMessage arg0) {
125         }
126
127         @Override
128         public void onSubscribe(InterceptSubscribeMessage arg0) {
129         }
130
131         @Override
132         public void onUnsubscribe(InterceptUnsubscribeMessage arg0) {
133         }
134     }
135
136     protected @Nullable Server server;
137     private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerService.class);
138     protected MqttEmbeddedBrokerDetectStart detectStart = new MqttEmbeddedBrokerDetectStart(this);
139     protected BrokerMetricsListenerEx metrics = new BrokerMetricsListenerEx();
140
141     private @Nullable MqttBrokerConnection connection;
142
143     @Activate
144     public EmbeddedBrokerService(@Reference MqttService mqttService, Map<String, Object> configuration)
145             throws IOException {
146         this.service = mqttService;
147         initialize(configuration);
148     }
149
150     @Modified
151     public void modified(Map<String, Object> configuration) throws IOException {
152         deactivate();
153         initialize(configuration);
154     }
155
156     public void initialize(Map<String, Object> configuration) throws IOException {
157         ServiceConfiguration config = new Configuration(configuration).as(ServiceConfiguration.class);
158         int port = config.port == null ? (config.port = config.secure ? 8883 : 1883) : config.port;
159
160         // Create MqttBrokerConnection
161         connection = service.getBrokerConnection(Constants.CLIENTID);
162         if (connection != null) {
163             // Close the existing connection and remove it from the service
164             connection.stop();
165             service.removeBrokerConnection(Constants.CLIENTID);
166         }
167
168         connection = new MqttBrokerConnection("localhost", config.port, config.secure, Constants.CLIENTID);
169         connection.addConnectionObserver(this);
170
171         if (config.username != null) {
172             connection.setCredentials(config.username, config.password);
173         }
174
175         if (!config.persistenceFile.isEmpty()) {
176             final String persistenceFilename = config.persistenceFile;
177             if (!Paths.get(persistenceFilename).isAbsolute()) {
178                 Path path = Paths.get(OpenHAB.getUserDataFolder()).toAbsolutePath();
179                 Files.createDirectories(path);
180                 this.persistenceFilename = path.resolve(persistenceFilename).toString();
181             }
182
183             logger.info("Broker persistence file: {}", persistenceFilename);
184         } else {
185             logger.info("Using in-memory persistence. No persistence file has been set!");
186         }
187
188         // Start embedded server
189         startEmbeddedServer(port, config.secure, config.username, config.password);
190     }
191
192     @Deactivate
193     public void deactivate() {
194         if (service != null) {
195             service.removeBrokersListener(this);
196         }
197         MqttBrokerConnection connection = this.connection;
198         if (connection == null) {
199             if (server != null) {
200                 server.stopServer();
201             }
202             server = null;
203             return;
204         }
205
206         // Clean shutdown: Stop connection, wait for process to finish, shutdown server
207         connection.removeConnectionObserver(this);
208         try {
209             connection.stop().thenRun(() -> {
210                 if (server != null) {
211                     server.stopServer();
212                     server = null;
213                 }
214             }).get(10, TimeUnit.SECONDS);
215         } catch (InterruptedException | ExecutionException | TimeoutException e) {
216             logger.warn("Could not cleanly shutdown connection or server.", e);
217         }
218         connection = null;
219     }
220
221     @Override
222     public void brokerAdded(String brokerID, MqttBrokerConnection broker) {
223     }
224
225     @SuppressWarnings("null")
226     @Override
227     public void brokerRemoved(String brokerID, MqttBrokerConnection broker) {
228         // Do not allow this connection to be removed. Add it again.
229         if (broker.equals(connection)) {
230             service.addBrokerConnection(brokerID, broker);
231         }
232     }
233
234     /**
235      * For TLS connections we need to setup a keystore and provide Moquette/Netty with an {@link SslContext}.
236      * <p>
237      * If a context is requested by Moquette, this creator
238      * will use the bundled "serverkeystore.keystore" with password "openhab".
239      *
240      * @return An SslContext creator (not be confused with javas SSLContext).
241      */
242     ISslContextCreator nettySSLcontextCreator() {
243         return () -> {
244             try {
245                 InputStream inputStream = getClass().getClassLoader().getResourceAsStream("serverkeystore.keystore");
246                 KeyStore keyStore = KeyStore.getInstance("jks");
247                 keyStore.load(inputStream, "openhab".toCharArray());
248                 KeyManagerFactory factory = KeyManagerFactory.getInstance("SunX509");
249                 factory.init(keyStore, "openhab".toCharArray());
250                 return SslContextBuilder.forServer(factory).build();
251             } catch (NoSuchAlgorithmException | CertificateException | IOException | KeyStoreException
252                     | UnrecoverableKeyException e) {
253                 logger.warn("Failed to create an SSL context");
254                 return null;
255             }
256         };
257     }
258
259     public void startEmbeddedServer(@Nullable Integer portParam, boolean secure, @Nullable String username,
260             @Nullable String password) throws IOException {
261         Server server = new Server();
262         Properties properties = new Properties();
263
264         // Host and port
265         properties.put(BrokerConstants.HOST_PROPERTY_NAME, "0.0.0.0");
266         int port;
267         if (secure) {
268             port = (portParam == null) ? port = 8883 : portParam;
269             properties.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, Integer.toString(port));
270             properties.put(BrokerConstants.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
271             properties.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "esheshesh");
272             properties.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "serverkeystore.jks");
273         } else {
274             port = (portParam == null) ? port = 1883 : portParam;
275             // with SSL_PORT_PROPERTY_NAME set, netty tries to evaluate the SSL context and shuts down immediately.
276             // properties.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
277             properties.put(BrokerConstants.PORT_PROPERTY_NAME, Integer.toString(port));
278         }
279
280         // Authentication
281         IAuthenticator authentificator = null;
282         if (username != null && password != null && username.length() > 0 && password.length() > 0) {
283             properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.FALSE.toString());
284             properties.put(BrokerConstants.AUTHENTICATOR_CLASS_NAME,
285                     MqttEmbeddedBrokerUserAuthenticator.class.getName());
286             authentificator = new MqttEmbeddedBrokerUserAuthenticator(username, password.getBytes());
287             logger.debug("Broker authentication is enabled");
288         } else {
289             properties.put(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString());
290             logger.debug("Broker anonymous access enabled");
291         }
292
293         if (!persistenceFilename.isEmpty()) { // Persistence: If not set, an in-memory database is used.
294             properties.put(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, persistenceFilename);
295             properties.put(BrokerConstants.AUTOSAVE_INTERVAL_PROPERTY_NAME, "30"); // in seconds
296         }
297
298         // We may provide ACL functionality at some point as well
299         IAuthorizatorPolicy authorizer = null;
300         ISslContextCreator sslContextCreator = secure ? nettySSLcontextCreator() : null;
301
302         try {
303             server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
304         } catch (IllegalArgumentException e) {
305             if (e.getMessage().contains("Could not deserialize")) {
306                 Path persistenceFilePath = Paths.get((new File(persistenceFilename)).getAbsolutePath());
307                 logger.warn("persistence corrupt: {}, deleting {}", e.getMessage(), persistenceFilePath);
308                 Files.delete(persistenceFilePath);
309                 // retry starting broker, if it fails again, don't catch exception
310                 server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
311             }
312         } catch (Exception e) {
313             logger.warn("Failed to start embedded MQTT server: {}", e.getMessage());
314             server.stopServer();
315             return;
316         }
317
318         this.server = server;
319         server.addInterceptHandler(metrics);
320         ScheduledExecutorService s = new ScheduledThreadPoolExecutor(1);
321         detectStart.startBrokerStartedDetection(port, s);
322     }
323
324     public void stopEmbeddedServer() {
325         Server server = this.server;
326         if (server != null) {
327             server.removeInterceptHandler(metrics);
328             detectStart.stopBrokerStartDetection();
329             server.stopServer();
330             this.server = null;
331         }
332     }
333
334     /**
335      * For testing: Returns true if the embedded server confirms that the MqttBrokerConnection is connected.
336      */
337     protected boolean serverConfirmsEmbeddedClient() {
338         return server != null && server.listConnectedClients().stream()
339                 .anyMatch(client -> Constants.CLIENTID.equals(client.getClientID()));
340     }
341
342     @Override
343     public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
344         if (state == MqttConnectionState.CONNECTED) {
345             logger.debug("Embedded broker connection connected");
346         } else if (state == MqttConnectionState.CONNECTING) {
347             logger.debug("Embedded broker connection still connecting");
348         } else {
349             if (error == null) {
350                 logger.warn("Embedded broker offline - Reason unknown");
351             } else {
352                 logger.warn("Embedded broker offline", error);
353             }
354         }
355
356         if (state != MqttConnectionState.CONNECTED && state != MqttConnectionState.CONNECTING) {
357             stopEmbeddedServer();
358         }
359     }
360
361     /**
362      * The callback from the detectStart.startBrokerStartedDetection() call within
363      * {@link #startEmbeddedServer(Integer, boolean, String, String, String)}.
364      */
365     @Override
366     public void mqttEmbeddedBrokerStarted(boolean timeout) {
367         MqttBrokerConnection connection = this.connection;
368         MqttService service = this.service;
369         if (connection == null || service == null) {
370             return;
371         }
372         service.addBrokerConnection(Constants.CLIENTID, connection);
373
374         connection.start().exceptionally(e -> {
375             connectionStateChanged(MqttConnectionState.DISCONNECTED, e);
376             return false;
377         }).thenAccept(v -> {
378             if (!v) {
379                 connectionStateChanged(MqttConnectionState.DISCONNECTED, new TimeoutException("Timeout"));
380             }
381         });
382     }
383
384     public @Nullable MqttBrokerConnection getConnection() {
385         return connection;
386     }
387
388     public String getPersistenceFilename() {
389         return persistenceFilename;
390     }
391
392     public void setPersistenceFilename(String persistenceFilename) {
393         this.persistenceFilename = persistenceFilename;
394     }
395 }