]> git.basschouten.com Git - openhab-addons.git/blob
f15a4245e11d090635c36fbb568e4daca05662a7
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.mqttembeddedbroker;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.assertTrue;
18 import static org.mockito.MockitoAnnotations.openMocks;
19
20 import java.nio.charset.StandardCharsets;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.junit.jupiter.api.AfterEach;
31 import org.junit.jupiter.api.BeforeEach;
32 import org.junit.jupiter.api.Test;
33 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
34 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
35 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
36 import org.openhab.core.io.transport.mqtt.MqttService;
37 import org.openhab.core.test.java.JavaOSGiTest;
38
39 /**
40  * Moquette test
41  *
42  * @author Jan N. Klug - Initial contribution
43  */
44 @NonNullByDefault
45 public class MoquetteTest extends JavaOSGiTest {
46     private static final String TEST_TOPIC = "testtopic";
47
48     private @NonNullByDefault({}) AutoCloseable mocksCloseable;
49
50     private @NonNullByDefault({}) MqttService mqttService;
51     private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
52     private @NonNullByDefault({}) MqttBrokerConnection clientConnection;
53
54     /**
55      * Create an observer that fails the test as soon as the broker client connection changes its connection state
56      * to something else then CONNECTED.
57      */
58     private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
59             is(MqttConnectionState.CONNECTED));
60
61     @BeforeEach
62     public void beforeEach() throws Exception {
63         registerVolatileStorageService();
64         mocksCloseable = openMocks(this);
65         mqttService = getService(MqttService.class);
66
67         // Wait for the EmbeddedBrokerService internal connection to be connected
68         embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
69         embeddedConnection.setQos(1);
70
71         clientConnection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
72                 embeddedConnection.isSecure(), "client");
73         clientConnection.setQos(1);
74         clientConnection.start().get(500, TimeUnit.MILLISECONDS);
75         assertThat(clientConnection.connectionState(), is(MqttConnectionState.CONNECTED));
76         // If the connection state changes in between -> fail
77         clientConnection.addConnectionObserver(failIfChange);
78     }
79
80     @AfterEach
81     public void afterEach() throws Exception {
82         if (clientConnection != null) {
83             clientConnection.removeConnectionObserver(failIfChange);
84             clientConnection.stop().get(500, TimeUnit.MILLISECONDS);
85         }
86         mocksCloseable.close();
87     }
88
89     private CompletableFuture<Boolean> publish(String topic, String message) {
90         return embeddedConnection.publish(topic, message.getBytes(StandardCharsets.UTF_8), 0, true);
91     }
92
93     @Test
94     public void singleTopic() throws InterruptedException, ExecutionException, TimeoutException {
95         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
96
97         futures.add(publish(TEST_TOPIC, "testPayload"));
98
99         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
100
101         CountDownLatch c = new CountDownLatch(1);
102         futures.clear();
103         futures.add(clientConnection.subscribe(TEST_TOPIC, (topic, payload) -> c.countDown()));
104         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
105
106         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
107     }
108
109     @Test
110     public void multipleTopicsWithSingleSubscription()
111             throws InterruptedException, ExecutionException, TimeoutException {
112         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
113
114         futures.add(publish(TEST_TOPIC + "/1", "testPayload1"));
115         futures.add(publish(TEST_TOPIC + "/2", "testPayload2"));
116
117         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
118
119         CountDownLatch c = new CountDownLatch(2);
120         futures.clear();
121         futures.add(clientConnection.subscribe(TEST_TOPIC + "/1", (topic, payload) -> c.countDown()));
122         futures.add(clientConnection.subscribe(TEST_TOPIC + "/2", (topic, payload) -> c.countDown()));
123         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
124
125         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
126     }
127
128     @Test
129     public void multipleTopicsWithHashWildcardSubscription()
130             throws InterruptedException, ExecutionException, TimeoutException {
131         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
132
133         futures.add(publish(TEST_TOPIC + "/1", "testPayload1"));
134         futures.add(publish(TEST_TOPIC + "/2", "testPayload2"));
135
136         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
137
138         CountDownLatch c = new CountDownLatch(2);
139         futures.clear();
140         futures.add(clientConnection.subscribe(TEST_TOPIC + "/#", (topic, payload) -> c.countDown()));
141         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
142
143         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
144     }
145
146     @Test
147     public void multipleTopicsWithPlusWildcardSubscription()
148             throws InterruptedException, ExecutionException, TimeoutException {
149         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
150
151         futures.add(publish(TEST_TOPIC + "/1", "testPayload1"));
152         futures.add(publish(TEST_TOPIC + "/2", "testPayload2"));
153
154         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
155
156         CountDownLatch c = new CountDownLatch(2);
157         futures.clear();
158         futures.add(clientConnection.subscribe(TEST_TOPIC + "/+", (topic, payload) -> c.countDown()));
159         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
160
161         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
162     }
163 }