]> git.basschouten.com Git - openhab-addons.git/blob
516e0a862de360b7288cd4e86c6a1a39209b1992
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.mqttembeddedbroker;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.assertTrue;
18 import static org.mockito.MockitoAnnotations.openMocks;
19
20 import java.nio.charset.StandardCharsets;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28
29 import org.junit.jupiter.api.AfterEach;
30 import org.junit.jupiter.api.BeforeEach;
31 import org.junit.jupiter.api.Test;
32 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
33 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
34 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
35 import org.openhab.core.io.transport.mqtt.MqttService;
36 import org.openhab.core.test.java.JavaOSGiTest;
37
38 /**
39  * Moquette test
40  *
41  * @author Jan N. Klug - Initial contribution
42  */
43 public class MoquetteTest extends JavaOSGiTest {
44     private static final String TEST_TOPIC = "testtopic";
45
46     private AutoCloseable mocksCloseable;
47
48     private MqttService mqttService;
49     private MqttBrokerConnection embeddedConnection;
50     private MqttBrokerConnection clientConnection;
51
52     /**
53      * Create an observer that fails the test as soon as the broker client connection changes its connection state
54      * to something else then CONNECTED.
55      */
56     private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
57             is(MqttConnectionState.CONNECTED));
58
59     @BeforeEach
60     public void beforeEach() throws Exception {
61         registerVolatileStorageService();
62         mocksCloseable = openMocks(this);
63         mqttService = getService(MqttService.class);
64
65         // Wait for the EmbeddedBrokerService internal connection to be connected
66         embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
67         embeddedConnection.setQos(1);
68
69         clientConnection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
70                 embeddedConnection.isSecure(), "client");
71         clientConnection.setQos(1);
72         clientConnection.start().get(500, TimeUnit.MILLISECONDS);
73         assertThat(clientConnection.connectionState(), is(MqttConnectionState.CONNECTED));
74         // If the connection state changes in between -> fail
75         clientConnection.addConnectionObserver(failIfChange);
76     }
77
78     @AfterEach
79     public void afterEach() throws Exception {
80         if (clientConnection != null) {
81             clientConnection.removeConnectionObserver(failIfChange);
82             clientConnection.stop().get(500, TimeUnit.MILLISECONDS);
83         }
84         mocksCloseable.close();
85     }
86
87     @Test
88     public void singleTopic() throws InterruptedException, ExecutionException, TimeoutException {
89         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
90
91         futures.add(embeddedConnection.publish(TEST_TOPIC, "testPayload".getBytes(StandardCharsets.UTF_8), 1, true));
92
93         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
94
95         CountDownLatch c = new CountDownLatch(1);
96         futures.clear();
97         futures.add(clientConnection.subscribe(TEST_TOPIC, (topic, payload) -> c.countDown()));
98         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
99
100         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
101     }
102
103     @Test
104     public void multipleTopicsWithSingleSubscription()
105             throws InterruptedException, ExecutionException, TimeoutException {
106         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
107
108         futures.add(embeddedConnection.publish(TEST_TOPIC + "/1", "testPayload1".getBytes(StandardCharsets.UTF_8), 1,
109                 true));
110         futures.add(embeddedConnection.publish(TEST_TOPIC + "/2", "testPayload2".getBytes(StandardCharsets.UTF_8), 1,
111                 true));
112
113         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
114
115         CountDownLatch c = new CountDownLatch(2);
116         futures.clear();
117         futures.add(clientConnection.subscribe(TEST_TOPIC + "/1", (topic, payload) -> c.countDown()));
118         futures.add(clientConnection.subscribe(TEST_TOPIC + "/2", (topic, payload) -> c.countDown()));
119         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
120
121         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
122     }
123
124     @Test
125     public void multipleTopicsWithHashWildcardSubscription()
126             throws InterruptedException, ExecutionException, TimeoutException {
127         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
128
129         futures.add(embeddedConnection.publish(TEST_TOPIC + "/1", "testPayload1".getBytes(StandardCharsets.UTF_8), 1,
130                 true));
131         futures.add(embeddedConnection.publish(TEST_TOPIC + "/2", "testPayload2".getBytes(StandardCharsets.UTF_8), 1,
132                 true));
133
134         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
135
136         CountDownLatch c = new CountDownLatch(2);
137         futures.clear();
138         futures.add(clientConnection.subscribe(TEST_TOPIC + "/#", (topic, payload) -> c.countDown()));
139         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
140
141         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
142     }
143
144     @Test
145     public void multipleTopicsWithPlusWildcardSubscription()
146             throws InterruptedException, ExecutionException, TimeoutException {
147         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
148
149         futures.add(embeddedConnection.publish(TEST_TOPIC + "/1", "testPayload1".getBytes(StandardCharsets.UTF_8), 1,
150                 true));
151         futures.add(embeddedConnection.publish(TEST_TOPIC + "/2", "testPayload2".getBytes(StandardCharsets.UTF_8), 1,
152                 true));
153
154         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
155
156         CountDownLatch c = new CountDownLatch(2);
157         futures.clear();
158         futures.add(clientConnection.subscribe(TEST_TOPIC + "/+", (topic, payload) -> c.countDown()));
159         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
160
161         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
162     }
163 }