2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.io.mqttembeddedbroker.internal;
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.junit.Assert.*;
17 import static org.mockito.ArgumentMatchers.*;
18 import static org.mockito.Mockito.verify;
21 import java.io.IOException;
22 import java.nio.file.Path;
23 import java.nio.file.Paths;
24 import java.security.GeneralSecurityException;
25 import java.util.HashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.TimeUnit;
31 import javax.naming.ConfigurationException;
33 import org.apache.commons.io.FileUtils;
34 import org.h2.mvstore.MVMap;
35 import org.h2.mvstore.MVStore;
36 import org.junit.After;
37 import org.junit.Before;
38 import org.junit.Test;
39 import org.mockito.Mock;
40 import org.mockito.MockitoAnnotations;
41 import org.openhab.core.config.core.ConfigConstants;
42 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
43 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.Protocol;
44 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
45 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
46 import org.openhab.core.io.transport.mqtt.MqttException;
47 import org.openhab.core.io.transport.mqtt.MqttService;
48 import org.openhab.core.test.java.JavaTest;
50 import io.moquette.broker.RetainedMessage;
51 import io.moquette.broker.subscriptions.Topic;
54 * Tests connections with the embedded broker. Checks for credential based login,
55 * check for SSL connections.
57 * @author David Graeff - Initial contribution
59 public class MqttEmbeddedBrokerServiceTest extends JavaTest {
61 private EmbeddedBrokerService subject;
62 private Map<String, Object> config = new HashMap<>();
63 private @Mock MqttService service;
66 public void setUp() throws ConfigurationException, MqttException, GeneralSecurityException, IOException {
67 MockitoAnnotations.initMocks(this);
69 config.put("username", "username");
70 config.put("password", "password");
71 config.put("port", 12345);
72 config.put("secure", false);
73 config.put("persistenceFile", "");
75 subject = new EmbeddedBrokerService(service, config);
79 public void cleanUp() {
83 public void waitForConnectionChange(MqttBrokerConnection c, MqttConnectionState expectedState)
84 throws InterruptedException {
85 Semaphore semaphore = new Semaphore(1);
88 MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
89 if (state == expectedState) {
93 c.addConnectionObserver(mqttConnectionObserver);
94 if (c.connectionState() == expectedState) {
98 // Start the connection and wait until timeout or connected callback returns.
99 semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS);
101 c.removeConnectionObserver(mqttConnectionObserver);
105 public void connectUnsecureAndTestCredentials() throws InterruptedException, IOException, ExecutionException {
106 MqttBrokerConnection c = subject.getConnection();
108 waitForConnectionChange(c, MqttConnectionState.CONNECTED);
110 assertThat(c.getUser(), is("username"));
111 assertThat(c.getPassword(), is("password"));
113 assertThat(c.connectionState(), is(MqttConnectionState.CONNECTED));
114 verify(service).addBrokerConnection(anyString(), eq(c));
116 // Connect with a second connection but wrong credentials
117 MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(), false,
119 wrongCredentials.setCredentials("someUser", "somePassword");
121 if (wrongCredentials.start().get()) {
122 fail("Wrong credentials accepted!");
125 wrongCredentials.stop().get();
127 // Connect with a second connection but correct credentials
128 MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(),
129 false, "correctCred");
130 correctCredentials.setCredentials(c.getUser(), c.getPassword());
132 if (!correctCredentials.start().get()) {
133 fail("Couldn't connect although correct credentials");
136 correctCredentials.stop().get();
140 public void connectSecure() throws InterruptedException, IOException {
141 config.put("secure", true);
142 subject.modified(config);
144 MqttBrokerConnection c = subject.getConnection();
147 waitForConnectionChange(c, MqttConnectionState.CONNECTED);
149 assertThat(c.getUser(), is("username"));
150 assertThat(c.getPassword(), is("password"));
152 assertThat(c.connectionState(), is(MqttConnectionState.CONNECTED));
153 verify(service).addBrokerConnection(anyString(), eq(c));
157 public void testPersistence() throws InterruptedException, IOException, ExecutionException {
158 config.put("persistenceFile", "persist.mqtt");
159 Path path = Paths.get(ConfigConstants.getUserDataFolder()).toAbsolutePath();
160 File jksFile = path.resolve("persist.mqtt").toFile();
162 if (jksFile.exists()) {
166 subject.modified(config);
168 MqttBrokerConnection c = subject.getConnection();
171 waitForConnectionChange(c, MqttConnectionState.CONNECTED);
173 c.publish("demotopic", "testtest".getBytes(), 2, true).get();
175 // Stop server -> close persistence storage and sync it to disk
176 subject.deactivate();
177 assertTrue(jksFile.exists());
178 // this is needed to ensure the file is correctly written
179 waitForAssert(() -> assertEquals(12288, jksFile.length()));
181 // The original file is still open, create a temp file for examination
182 File temp = File.createTempFile("abc", ".tmp");
184 FileUtils.copyFile(jksFile, temp);
186 MVStore mvStore = new MVStore.Builder().fileName(temp.getAbsolutePath()).autoCommitDisabled().open();
187 MVMap<Topic, RetainedMessage> openMap = mvStore.openMap("retained_store");
189 assertThat(openMap.size(), is(1));
190 for (Map.Entry<Topic, RetainedMessage> entry : openMap.entrySet()) {
191 assertThat(entry.getKey().toString(), is("demotopic"));
192 assertThat(new String(entry.getValue().getPayload()), is("testtest"));