]> git.basschouten.com Git - openhab-addons.git/blob
92eb4f408ad1317968995612bc3f1932cc1af704
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.mqttembeddedbroker.internal;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.*;
18 import static org.mockito.ArgumentMatchers.*;
19 import static org.mockito.Mockito.verify;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.nio.file.Path;
24 import java.nio.file.Paths;
25 import java.security.GeneralSecurityException;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Semaphore;
30 import java.util.concurrent.TimeUnit;
31
32 import javax.naming.ConfigurationException;
33
34 import org.apache.commons.io.FileUtils;
35 import org.h2.mvstore.MVMap;
36 import org.h2.mvstore.MVStore;
37 import org.junit.jupiter.api.AfterEach;
38 import org.junit.jupiter.api.BeforeEach;
39 import org.junit.jupiter.api.Test;
40 import org.junit.jupiter.api.extension.ExtendWith;
41 import org.mockito.Mock;
42 import org.mockito.junit.jupiter.MockitoExtension;
43 import org.openhab.core.OpenHAB;
44 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
45 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.Protocol;
46 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
47 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
48 import org.openhab.core.io.transport.mqtt.MqttException;
49 import org.openhab.core.io.transport.mqtt.MqttService;
50 import org.openhab.core.test.java.JavaTest;
51
52 import io.moquette.broker.RetainedMessage;
53 import io.moquette.broker.subscriptions.Topic;
54
55 /**
56  * Tests connections with the embedded broker. Checks for credential based login,
57  * check for SSL connections.
58  *
59  * @author David Graeff - Initial contribution
60  */
61 @ExtendWith(MockitoExtension.class)
62 public class MqttEmbeddedBrokerServiceTest extends JavaTest {
63
64     private EmbeddedBrokerService subject;
65     private Map<String, Object> config = new HashMap<>();
66     private @Mock MqttService service;
67
68     @BeforeEach
69     public void setUp() throws ConfigurationException, MqttException, GeneralSecurityException, IOException {
70         config.put("username", "username");
71         config.put("password", "password");
72         config.put("port", 12345);
73         config.put("secure", false);
74         config.put("persistenceFile", "");
75
76         subject = new EmbeddedBrokerService(service, config);
77     }
78
79     @AfterEach
80     public void cleanUp() {
81         subject.deactivate();
82     }
83
84     public void waitForConnectionChange(MqttBrokerConnection c, MqttConnectionState expectedState)
85             throws InterruptedException {
86         Semaphore semaphore = new Semaphore(1);
87         semaphore.acquire();
88
89         MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
90             if (state == expectedState) {
91                 semaphore.release();
92             }
93         };
94         c.addConnectionObserver(mqttConnectionObserver);
95         if (c.connectionState() == expectedState) {
96             semaphore.release();
97         }
98
99         // Start the connection and wait until timeout or connected callback returns.
100         semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS);
101
102         c.removeConnectionObserver(mqttConnectionObserver);
103     }
104
105     @Test
106     public void connectUnsecureAndTestCredentials() throws InterruptedException, IOException, ExecutionException {
107         MqttBrokerConnection c = subject.getConnection();
108         assertNotNull(c);
109         waitForConnectionChange(c, MqttConnectionState.CONNECTED);
110
111         assertThat(c.getUser(), is("username"));
112         assertThat(c.getPassword(), is("password"));
113
114         assertThat(c.connectionState(), is(MqttConnectionState.CONNECTED));
115         verify(service).addBrokerConnection(anyString(), eq(c));
116
117         // Connect with a second connection but wrong credentials
118         MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(), false,
119                 "wrongCred");
120         wrongCredentials.setCredentials("someUser", "somePassword");
121
122         if (wrongCredentials.start().get()) {
123             fail("Wrong credentials accepted!");
124         }
125
126         wrongCredentials.stop().get();
127
128         // Connect with a second connection but correct credentials
129         MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(),
130                 false, "correctCred");
131         correctCredentials.setCredentials(c.getUser(), c.getPassword());
132
133         if (!correctCredentials.start().get()) {
134             fail("Couldn't connect although correct credentials");
135         }
136
137         correctCredentials.stop().get();
138     }
139
140     @Test
141     public void connectSecure() throws InterruptedException, IOException {
142         config.put("secure", true);
143         subject.modified(config);
144
145         MqttBrokerConnection c = subject.getConnection();
146         assertNotNull(c);
147
148         waitForConnectionChange(c, MqttConnectionState.CONNECTED);
149
150         assertThat(c.getUser(), is("username"));
151         assertThat(c.getPassword(), is("password"));
152
153         assertThat(c.connectionState(), is(MqttConnectionState.CONNECTED));
154         verify(service).addBrokerConnection(anyString(), eq(c));
155     }
156
157     @Test
158     public void testPersistence() throws InterruptedException, IOException, ExecutionException {
159         config.put("persistenceFile", "persist.mqtt");
160         Path path = Paths.get(OpenHAB.getUserDataFolder()).toAbsolutePath();
161         File jksFile = path.resolve("persist.mqtt").toFile();
162
163         if (jksFile.exists()) {
164             jksFile.delete();
165         }
166
167         subject.modified(config);
168
169         MqttBrokerConnection c = subject.getConnection();
170         assertNotNull(c);
171
172         waitForConnectionChange(c, MqttConnectionState.CONNECTED);
173
174         c.publish("demotopic", "testtest".getBytes(), 2, true).get();
175
176         // Stop server -> close persistence storage and sync it to disk
177         subject.deactivate();
178         assertTrue(jksFile.exists());
179         // this is needed to ensure the file is correctly written
180         waitForAssert(() -> assertEquals(12288, jksFile.length()));
181
182         // The original file is still open, create a temp file for examination
183         File temp = File.createTempFile("abc", ".tmp");
184         temp.deleteOnExit();
185         FileUtils.copyFile(jksFile, temp);
186
187         MVStore mvStore = new MVStore.Builder().fileName(temp.getAbsolutePath()).autoCommitDisabled().open();
188         MVMap<Topic, RetainedMessage> openMap = mvStore.openMap("retained_store");
189
190         assertThat(openMap.size(), is(1));
191         for (Map.Entry<Topic, RetainedMessage> entry : openMap.entrySet()) {
192             assertThat(entry.getKey().toString(), is("demotopic"));
193             assertThat(new String(entry.getValue().getPayload()), is("testtest"));
194         }
195     }
196 }