]> git.basschouten.com Git - openhab-addons.git/blob
005ddc3af3e0e4228923486476fede59a3b28f95
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.mqttembeddedbroker.internal;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.junit.Assert.*;
17 import static org.mockito.ArgumentMatchers.*;
18 import static org.mockito.Mockito.verify;
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.nio.file.Path;
23 import java.nio.file.Paths;
24 import java.security.GeneralSecurityException;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.TimeUnit;
30
31 import javax.naming.ConfigurationException;
32
33 import org.apache.commons.io.FileUtils;
34 import org.h2.mvstore.MVMap;
35 import org.h2.mvstore.MVStore;
36 import org.junit.After;
37 import org.junit.Before;
38 import org.junit.Test;
39 import org.mockito.Mock;
40 import org.mockito.MockitoAnnotations;
41 import org.openhab.core.config.core.ConfigConstants;
42 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
43 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.Protocol;
44 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
45 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
46 import org.openhab.core.io.transport.mqtt.MqttException;
47 import org.openhab.core.io.transport.mqtt.MqttService;
48 import org.openhab.core.test.java.JavaTest;
49
50 import io.moquette.broker.RetainedMessage;
51 import io.moquette.broker.subscriptions.Topic;
52
53 /**
54  * Tests connections with the embedded broker. Checks for credential based login,
55  * check for SSL connections.
56  *
57  * @author David Graeff - Initial contribution
58  */
59 public class MqttEmbeddedBrokerServiceTest extends JavaTest {
60
61     private EmbeddedBrokerService subject;
62     private Map<String, Object> config = new HashMap<>();
63     private @Mock MqttService service;
64
65     @Before
66     public void setUp() throws ConfigurationException, MqttException, GeneralSecurityException, IOException {
67         MockitoAnnotations.initMocks(this);
68
69         config.put("username", "username");
70         config.put("password", "password");
71         config.put("port", 12345);
72         config.put("secure", false);
73         config.put("persistenceFile", "");
74
75         subject = new EmbeddedBrokerService(service, config);
76     }
77
78     @After
79     public void cleanUp() {
80         subject.deactivate();
81     }
82
83     public void waitForConnectionChange(MqttBrokerConnection c, MqttConnectionState expectedState)
84             throws InterruptedException {
85         Semaphore semaphore = new Semaphore(1);
86         semaphore.acquire();
87
88         MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
89             if (state == expectedState) {
90                 semaphore.release();
91             }
92         };
93         c.addConnectionObserver(mqttConnectionObserver);
94         if (c.connectionState() == expectedState) {
95             semaphore.release();
96         }
97
98         // Start the connection and wait until timeout or connected callback returns.
99         semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS);
100
101         c.removeConnectionObserver(mqttConnectionObserver);
102     }
103
104     @Test
105     public void connectUnsecureAndTestCredentials() throws InterruptedException, IOException, ExecutionException {
106         MqttBrokerConnection c = subject.getConnection();
107         assertNotNull(c);
108         waitForConnectionChange(c, MqttConnectionState.CONNECTED);
109
110         assertThat(c.getUser(), is("username"));
111         assertThat(c.getPassword(), is("password"));
112
113         assertThat(c.connectionState(), is(MqttConnectionState.CONNECTED));
114         verify(service).addBrokerConnection(anyString(), eq(c));
115
116         // Connect with a second connection but wrong credentials
117         MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(), false,
118                 "wrongCred");
119         wrongCredentials.setCredentials("someUser", "somePassword");
120
121         if (wrongCredentials.start().get()) {
122             fail("Wrong credentials accepted!");
123         }
124
125         wrongCredentials.stop().get();
126
127         // Connect with a second connection but correct credentials
128         MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(),
129                 false, "correctCred");
130         correctCredentials.setCredentials(c.getUser(), c.getPassword());
131
132         if (!correctCredentials.start().get()) {
133             fail("Couldn't connect although correct credentials");
134         }
135
136         correctCredentials.stop().get();
137     }
138
139     @Test
140     public void connectSecure() throws InterruptedException, IOException {
141         config.put("secure", true);
142         subject.modified(config);
143
144         MqttBrokerConnection c = subject.getConnection();
145         assertNotNull(c);
146
147         waitForConnectionChange(c, MqttConnectionState.CONNECTED);
148
149         assertThat(c.getUser(), is("username"));
150         assertThat(c.getPassword(), is("password"));
151
152         assertThat(c.connectionState(), is(MqttConnectionState.CONNECTED));
153         verify(service).addBrokerConnection(anyString(), eq(c));
154     }
155
156     @Test
157     public void testPersistence() throws InterruptedException, IOException, ExecutionException {
158         config.put("persistenceFile", "persist.mqtt");
159         Path path = Paths.get(ConfigConstants.getUserDataFolder()).toAbsolutePath();
160         File jksFile = path.resolve("persist.mqtt").toFile();
161
162         if (jksFile.exists()) {
163             jksFile.delete();
164         }
165
166         subject.modified(config);
167
168         MqttBrokerConnection c = subject.getConnection();
169         assertNotNull(c);
170
171         waitForConnectionChange(c, MqttConnectionState.CONNECTED);
172
173         c.publish("demotopic", "testtest".getBytes(), 2, true).get();
174
175         // Stop server -> close persistence storage and sync it to disk
176         subject.deactivate();
177         assertTrue(jksFile.exists());
178         // this is needed to ensure the file is correctly written
179         waitForAssert(() -> assertEquals(12288, jksFile.length()));
180
181         // The original file is still open, create a temp file for examination
182         File temp = File.createTempFile("abc", ".tmp");
183         temp.deleteOnExit();
184         FileUtils.copyFile(jksFile, temp);
185
186         MVStore mvStore = new MVStore.Builder().fileName(temp.getAbsolutePath()).autoCommitDisabled().open();
187         MVMap<Topic, RetainedMessage> openMap = mvStore.openMap("retained_store");
188
189         assertThat(openMap.size(), is(1));
190         for (Map.Entry<Topic, RetainedMessage> entry : openMap.entrySet()) {
191             assertThat(entry.getKey().toString(), is("demotopic"));
192             assertThat(new String(entry.getValue().getPayload()), is("testtest"));
193         }
194     }
195 }