]> git.basschouten.com Git - openhab-addons.git/blob
1da3d3f29e78844482bbd4bb1cf4d0064bf20b49
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.mqttembeddedbroker;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.junit.Assert.*;
17 import static org.mockito.MockitoAnnotations.initMocks;
18
19 import java.nio.charset.StandardCharsets;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27
28 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
29 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
30 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
31 import org.openhab.core.io.transport.mqtt.MqttService;
32 import org.openhab.core.test.java.JavaOSGiTest;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36
37 /**
38  * Moquette test
39  *
40  * @author Jan N. Klug - Initial contribution
41  */
42 public class MoquetteTest extends JavaOSGiTest {
43     private static final String TEST_TOPIC = "testtopic";
44
45     private MqttService mqttService;
46     private MqttBrokerConnection embeddedConnection;
47     private MqttBrokerConnection clientConnection;
48
49     /**
50      * Create an observer that fails the test as soon as the broker client connection changes its connection state
51      * to something else then CONNECTED.
52      */
53     private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
54             is(MqttConnectionState.CONNECTED));
55
56     @Before
57     public void setUp() throws InterruptedException, ExecutionException, TimeoutException {
58         registerVolatileStorageService();
59         initMocks(this);
60         mqttService = getService(MqttService.class);
61
62         // Wait for the EmbeddedBrokerService internal connection to be connected
63         embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
64         embeddedConnection.setQos(1);
65
66         clientConnection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
67                 embeddedConnection.isSecure(), "client");
68         clientConnection.setQos(1);
69         clientConnection.start().get(500, TimeUnit.MILLISECONDS);
70         assertThat(clientConnection.connectionState(), is(MqttConnectionState.CONNECTED));
71         // If the connection state changes in between -> fail
72         clientConnection.addConnectionObserver(failIfChange);
73     }
74
75     @After
76     public void tearDown() throws InterruptedException, ExecutionException, TimeoutException {
77         if (clientConnection != null) {
78             clientConnection.removeConnectionObserver(failIfChange);
79             clientConnection.stop().get(500, TimeUnit.MILLISECONDS);
80         }
81     }
82
83     @Test
84     public void singleTopic() throws InterruptedException, ExecutionException, TimeoutException {
85         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
86
87         futures.add(embeddedConnection.publish(TEST_TOPIC, "testPayload".getBytes(StandardCharsets.UTF_8), 1, true));
88
89         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
90
91         CountDownLatch c = new CountDownLatch(1);
92         futures.clear();
93         futures.add(clientConnection.subscribe(TEST_TOPIC, (topic, payload) -> c.countDown()));
94         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
95
96         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
97     }
98
99     @Test
100     public void multipleTopicsWithSingleSubscription()
101             throws InterruptedException, ExecutionException, TimeoutException {
102         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
103
104         futures.add(embeddedConnection.publish(TEST_TOPIC + "/1", "testPayload1".getBytes(StandardCharsets.UTF_8), 1, true));
105         futures.add(embeddedConnection.publish(TEST_TOPIC + "/2", "testPayload2".getBytes(StandardCharsets.UTF_8), 1, true));
106
107         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
108
109         CountDownLatch c = new CountDownLatch(2);
110         futures.clear();
111         futures.add(clientConnection.subscribe(TEST_TOPIC + "/1", (topic, payload) -> c.countDown()));
112         futures.add(clientConnection.subscribe(TEST_TOPIC + "/2", (topic, payload) -> c.countDown()));
113         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
114
115         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
116     }
117
118     @Test
119     public void multipleTopicsWithHashWildcardSubscription()
120             throws InterruptedException, ExecutionException, TimeoutException {
121         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
122
123         futures.add(embeddedConnection.publish(TEST_TOPIC + "/1", "testPayload1".getBytes(StandardCharsets.UTF_8), 1, true));
124         futures.add(embeddedConnection.publish(TEST_TOPIC + "/2", "testPayload2".getBytes(StandardCharsets.UTF_8), 1, true));
125
126         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
127
128         CountDownLatch c = new CountDownLatch(2);
129         futures.clear();
130         futures.add(clientConnection.subscribe(TEST_TOPIC + "/#", (topic, payload) -> c.countDown()));
131         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
132
133         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
134     }
135
136     @Test
137     public void multipleTopicsWithPlusWildcardSubscription()
138             throws InterruptedException, ExecutionException, TimeoutException {
139         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
140
141         futures.add(embeddedConnection.publish(TEST_TOPIC + "/1", "testPayload1".getBytes(StandardCharsets.UTF_8), 1, true));
142         futures.add(embeddedConnection.publish(TEST_TOPIC + "/2", "testPayload2".getBytes(StandardCharsets.UTF_8), 1, true));
143
144         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
145
146         CountDownLatch c = new CountDownLatch(2);
147         futures.clear();
148         futures.add(clientConnection.subscribe(TEST_TOPIC + "/+", (topic, payload) -> c.countDown()));
149         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
150
151         assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
152     }
153 }