]> git.basschouten.com Git - openhab-addons.git/blob
a22a69477681dcb113c3859d6fd891abfd1dceb4
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.mqttembeddedbroker;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.assertTrue;
18 import static org.mockito.MockitoAnnotations.openMocks;
19
20 import java.nio.charset.StandardCharsets;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.junit.jupiter.api.AfterEach;
31 import org.junit.jupiter.api.BeforeEach;
32 import org.junit.jupiter.api.Test;
33 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
34 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
35 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
36 import org.openhab.core.io.transport.mqtt.MqttService;
37 import org.openhab.core.test.java.JavaOSGiTest;
38 import org.osgi.service.cm.ConfigurationAdmin;
39
40 /**
41  * Moquette test
42  *
43  * @author Jan N. Klug - Initial contribution
44  */
45 @NonNullByDefault
46 public class MoquetteTest extends JavaOSGiTest {
47     private static final String TEST_TOPIC = "testtopic";
48
49     private @NonNullByDefault({}) AutoCloseable mocksCloseable;
50
51     private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
52     private @NonNullByDefault({}) MqttService mqttService;
53     private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
54     private @NonNullByDefault({}) MqttBrokerConnection clientConnection;
55
56     /**
57      * Create an observer that fails the test as soon as the broker client connection changes its connection state
58      * to something else then CONNECTED.
59      */
60     private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
61             is(MqttConnectionState.CONNECTED));
62
63     @BeforeEach
64     public void beforeEach() throws Exception {
65         registerVolatileStorageService();
66         mocksCloseable = openMocks(this);
67         configurationAdmin = getService(ConfigurationAdmin.class);
68         mqttService = getService(MqttService.class);
69
70         // Wait for the EmbeddedBrokerService internal connection to be connected
71         embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
72         embeddedConnection.setQos(1);
73
74         clientConnection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
75                 embeddedConnection.isSecure(), "client");
76         clientConnection.setQos(1);
77         clientConnection.start().get(500, TimeUnit.MILLISECONDS);
78         assertThat(clientConnection.connectionState(), is(MqttConnectionState.CONNECTED));
79         // If the connection state changes in between -> fail
80         clientConnection.addConnectionObserver(failIfChange);
81     }
82
83     @AfterEach
84     public void afterEach() throws Exception {
85         if (clientConnection != null) {
86             clientConnection.removeConnectionObserver(failIfChange);
87             clientConnection.stop().get(500, TimeUnit.MILLISECONDS);
88         }
89         mocksCloseable.close();
90     }
91
92     private CompletableFuture<Boolean> publish(String topic, String message) {
93         return embeddedConnection.publish(topic, message.getBytes(StandardCharsets.UTF_8), 0, true);
94     }
95
96     @Test
97     public void singleTopic() throws InterruptedException, ExecutionException, TimeoutException {
98         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
99
100         futures.add(publish(TEST_TOPIC, "testPayload"));
101
102         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
103
104         CountDownLatch c = new CountDownLatch(1);
105         futures.clear();
106         futures.add(clientConnection.subscribe(TEST_TOPIC, (topic, payload) -> c.countDown()));
107         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
108
109         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
110     }
111
112     @Test
113     public void multipleTopicsWithSingleSubscription()
114             throws InterruptedException, ExecutionException, TimeoutException {
115         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
116
117         futures.add(publish(TEST_TOPIC + "/1", "testPayload1"));
118         futures.add(publish(TEST_TOPIC + "/2", "testPayload2"));
119
120         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
121
122         CountDownLatch c = new CountDownLatch(2);
123         futures.clear();
124         futures.add(clientConnection.subscribe(TEST_TOPIC + "/1", (topic, payload) -> c.countDown()));
125         futures.add(clientConnection.subscribe(TEST_TOPIC + "/2", (topic, payload) -> c.countDown()));
126         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
127
128         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
129     }
130
131     @Test
132     public void multipleTopicsWithHashWildcardSubscription()
133             throws InterruptedException, ExecutionException, TimeoutException {
134         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
135
136         futures.add(publish(TEST_TOPIC + "/1", "testPayload1"));
137         futures.add(publish(TEST_TOPIC + "/2", "testPayload2"));
138
139         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
140
141         CountDownLatch c = new CountDownLatch(2);
142         futures.clear();
143         futures.add(clientConnection.subscribe(TEST_TOPIC + "/#", (topic, payload) -> c.countDown()));
144         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
145
146         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
147     }
148
149     @Test
150     public void multipleTopicsWithPlusWildcardSubscription()
151             throws InterruptedException, ExecutionException, TimeoutException {
152         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
153
154         futures.add(publish(TEST_TOPIC + "/1", "testPayload1"));
155         futures.add(publish(TEST_TOPIC + "/2", "testPayload2"));
156
157         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
158
159         CountDownLatch c = new CountDownLatch(2);
160         futures.clear();
161         futures.add(clientConnection.subscribe(TEST_TOPIC + "/+", (topic, payload) -> c.countDown()));
162         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
163
164         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
165     }
166 }