2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.io.mqttembeddedbroker;
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.junit.Assert.*;
17 import static org.mockito.MockitoAnnotations.initMocks;
19 import java.nio.charset.StandardCharsets;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
28 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
29 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
30 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
31 import org.openhab.core.io.transport.mqtt.MqttService;
32 import org.openhab.core.test.java.JavaOSGiTest;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
40 * @author Jan N. Klug - Initial contribution
42 public class MoquetteTest extends JavaOSGiTest {
43 private static final String TEST_TOPIC = "testtopic";
45 private MqttService mqttService;
46 private MqttBrokerConnection embeddedConnection;
47 private MqttBrokerConnection clientConnection;
50 * Create an observer that fails the test as soon as the broker client connection changes its connection state
51 * to something else then CONNECTED.
53 private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
54 is(MqttConnectionState.CONNECTED));
57 public void setUp() throws InterruptedException, ExecutionException, TimeoutException {
58 registerVolatileStorageService();
60 mqttService = getService(MqttService.class);
62 // Wait for the EmbeddedBrokerService internal connection to be connected
63 embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
64 embeddedConnection.setQos(1);
66 clientConnection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
67 embeddedConnection.isSecure(), "client");
68 clientConnection.setQos(1);
69 clientConnection.start().get(500, TimeUnit.MILLISECONDS);
70 assertThat(clientConnection.connectionState(), is(MqttConnectionState.CONNECTED));
71 // If the connection state changes in between -> fail
72 clientConnection.addConnectionObserver(failIfChange);
76 public void tearDown() throws InterruptedException, ExecutionException, TimeoutException {
77 if (clientConnection != null) {
78 clientConnection.removeConnectionObserver(failIfChange);
79 clientConnection.stop().get(500, TimeUnit.MILLISECONDS);
84 public void singleTopic() throws InterruptedException, ExecutionException, TimeoutException {
85 List<CompletableFuture<Boolean>> futures = new ArrayList<>();
87 futures.add(embeddedConnection.publish(TEST_TOPIC, "testPayload".getBytes(StandardCharsets.UTF_8), 1, true));
89 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
91 CountDownLatch c = new CountDownLatch(1);
93 futures.add(clientConnection.subscribe(TEST_TOPIC, (topic, payload) -> c.countDown()));
94 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
96 assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
100 public void multipleTopicsWithSingleSubscription()
101 throws InterruptedException, ExecutionException, TimeoutException {
102 List<CompletableFuture<Boolean>> futures = new ArrayList<>();
104 futures.add(embeddedConnection.publish(TEST_TOPIC + "/1", "testPayload1".getBytes(StandardCharsets.UTF_8), 1, true));
105 futures.add(embeddedConnection.publish(TEST_TOPIC + "/2", "testPayload2".getBytes(StandardCharsets.UTF_8), 1, true));
107 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
109 CountDownLatch c = new CountDownLatch(2);
111 futures.add(clientConnection.subscribe(TEST_TOPIC + "/1", (topic, payload) -> c.countDown()));
112 futures.add(clientConnection.subscribe(TEST_TOPIC + "/2", (topic, payload) -> c.countDown()));
113 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
115 assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
119 public void multipleTopicsWithHashWildcardSubscription()
120 throws InterruptedException, ExecutionException, TimeoutException {
121 List<CompletableFuture<Boolean>> futures = new ArrayList<>();
123 futures.add(embeddedConnection.publish(TEST_TOPIC + "/1", "testPayload1".getBytes(StandardCharsets.UTF_8), 1, true));
124 futures.add(embeddedConnection.publish(TEST_TOPIC + "/2", "testPayload2".getBytes(StandardCharsets.UTF_8), 1, true));
126 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
128 CountDownLatch c = new CountDownLatch(2);
130 futures.add(clientConnection.subscribe(TEST_TOPIC + "/#", (topic, payload) -> c.countDown()));
131 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
133 assertTrue(c.await(1000, TimeUnit.MILLISECONDS));
137 public void multipleTopicsWithPlusWildcardSubscription()
138 throws InterruptedException, ExecutionException, TimeoutException {
139 List<CompletableFuture<Boolean>> futures = new ArrayList<>();
141 futures.add(embeddedConnection.publish(TEST_TOPIC + "/1", "testPayload1".getBytes(StandardCharsets.UTF_8), 1, true));
142 futures.add(embeddedConnection.publish(TEST_TOPIC + "/2", "testPayload2".getBytes(StandardCharsets.UTF_8), 1, true));
144 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
146 CountDownLatch c = new CountDownLatch(2);
148 futures.add(clientConnection.subscribe(TEST_TOPIC + "/+", (topic, payload) -> c.countDown()));
149 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
151 assertTrue(c.await(1000, TimeUnit.MILLISECONDS));