import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import java.util.Collections;
import org.openhab.binding.mqtt.internal.MqttThingID;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
-import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttException;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.thing.Bridge;
@BeforeEach
public void setUp() {
- doReturn(MqttThingID.getThingUID(HOST, PORT)).when(thing).getUID();
doReturn(new Configuration(Collections.singletonMap("brokerid", MqttThingID.getThingUID(HOST, PORT).getId())))
.when(thing).getConfiguration();
handler = new SystemBrokerHandler(thing, service);
@Test
public void brokerAddedWrongID() throws ConfigurationException, MqttException {
MqttBrokerConnection brokerConnection = mock(MqttBrokerConnection.class);
- when(brokerConnection.connectionState()).thenReturn(MqttConnectionState.CONNECTED);
handler.brokerAdded("nonsense_id", brokerConnection);
assertNull(handler.connection);
// We do not expect a status change, because brokerAdded will do nothing with invalid connections.
public void brokerAdded() throws ConfigurationException, MqttException {
MqttBrokerConnectionEx connection = spy(
new MqttBrokerConnectionEx("10.10.0.10", 80, false, "BrokerHandlerTest"));
- doReturn(connection).when(service).getBrokerConnection(eq(handler.brokerID));
verify(callback, times(0)).statusUpdated(any(), any());
handler.brokerAdded(handler.brokerID, connection);
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
-import org.openhab.binding.mqtt.internal.MqttThingID;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
@BeforeEach
public void setUp() throws ConfigurationException, MqttException {
scheduler = new ScheduledThreadPoolExecutor(1);
- when(thing.getUID()).thenReturn(MqttThingID.getThingUID("10.10.0.10", 80));
connection = spy(new MqttBrokerConnectionEx("10.10.0.10", 80, false, "BrokerHandlerTest"));
connection.setTimeoutExecutor(scheduler, 10);
connection.setConnectionCallback(connection);
--- /dev/null
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.mqtt.handler;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.openhab.core.io.transport.mqtt.MqttConnectionState;
+import org.openhab.core.io.transport.mqtt.MqttWillAndTestament;
+import org.openhab.core.io.transport.mqtt.internal.Subscription;
+import org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
+
+import com.hivemq.client.mqtt.MqttClientState;
+
+/**
+ * We need an extended MqttAsyncClientWrapper, that will, in respect to the success flags of the connection, immediately
+ * succeed or fail with publish, subscribe, unsubscribe, connect, disconnect.
+ *
+ * @author Jochen Klein - Initial contribution
+ */
+@NonNullByDefault
+public class MqttAsyncClientWrapperEx extends MqttAsyncClientWrapper {
+
+ private final MqttBrokerConnectionEx connection;
+
+ public MqttAsyncClientWrapperEx(MqttBrokerConnectionEx connection) {
+ super();
+ this.connection = connection;
+ }
+
+ @Override
+ public CompletableFuture<?> connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval,
+ @Nullable String username, @Nullable String password) {
+ if (!connection.connectTimeout) {
+ connection.getCallback().onConnected(null);
+ connection.connectionStateOverwrite = MqttConnectionState.CONNECTED;
+ return CompletableFuture.completedFuture(null);
+ }
+ return new CompletableFuture<>();
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Void> disconnect() {
+ if (connection.disconnectSuccess) {
+ connection.getCallback().onDisconnected(new Throwable("disconnect called"));
+ connection.connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
+ return CompletableFuture.completedFuture(null);
+ }
+ return new CompletableFuture<>();
+ }
+
+ @Override
+ public MqttClientState getState() {
+ return MqttClientState.CONNECTED;
+ }
+
+ @Override
+ public CompletableFuture<?> publish(String topic, byte[] payload, boolean retain, int qos) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?> subscribe(String topic, int qos, Subscription subscription) {
+ if (connection.subscribeSuccess) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return CompletableFuture.failedFuture(new Throwable("subscription failed"));
+ }
+
+ @Override
+ public CompletableFuture<?> unsubscribe(String topic) {
+ if (connection.unsubscribeSuccess) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return CompletableFuture.failedFuture(new Throwable("unsubscription failed"));
+ }
+}
*/
package org.openhab.binding.mqtt.handler;
-import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.spy;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.io.transport.mqtt.internal.Subscription;
import org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
-import com.hivemq.client.mqtt.MqttClientState;
-
/**
* We need an extended MqttBrokerConnection to overwrite the protected `connectionCallbacks` with
* an instance that takes the mocked version of `MqttBrokerConnection` and overwrite the connection state.
return subscribers;
}
+ public ConnectionCallback getCallback() {
+ return connectionCallback;
+ }
+
@Override
protected MqttAsyncClientWrapper createClient() {
- MqttAsyncClientWrapper mockedClient = mock(MqttAsyncClientWrapper.class);
- // connect
- doAnswer(i -> {
- if (!connectTimeout) {
- connectionCallback.onConnected(null);
- connectionStateOverwrite = MqttConnectionState.CONNECTED;
- return CompletableFuture.completedFuture(null);
- }
- return new CompletableFuture<>();
- }).when(mockedClient).connect(any(), anyInt(), any(), any());
- doAnswer(i -> {
- if (disconnectSuccess) {
- connectionCallback.onDisconnected(new Throwable("disconnect called"));
- connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
- return CompletableFuture.completedFuture(null);
- }
- return new CompletableFuture<>();
- }).when(mockedClient).disconnect();
- // subscribe
- doAnswer(i -> {
- if (subscribeSuccess) {
- return CompletableFuture.completedFuture(null);
- } else {
- CompletableFuture<Void> future = new CompletableFuture<>();
- future.completeExceptionally(new Throwable("subscription failed"));
- return future;
- }
- }).when(mockedClient).subscribe(any(), anyInt(), any());
- // unsubscribe
- doAnswer(i -> {
- if (unsubscribeSuccess) {
- return CompletableFuture.completedFuture(null);
- } else {
- CompletableFuture<Void> future = new CompletableFuture<>();
- future.completeExceptionally(new Throwable("unsubscription failed"));
- return future;
- }
- }).when(mockedClient).unsubscribe(any());
- // state
- doAnswer(i -> {
- return MqttClientState.CONNECTED;
- }).when(mockedClient).getState();
- return mockedClient;
+ return new MqttAsyncClientWrapperEx(this);
}
@Override
--- /dev/null
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.mqtt.internal;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryParticipant;
+import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryService;
+import org.openhab.binding.mqtt.handler.BrokerHandler;
+import org.openhab.binding.mqtt.handler.BrokerHandlerEx;
+import org.openhab.binding.mqtt.handler.MqttBrokerConnectionEx;
+import org.openhab.core.config.core.Configuration;
+import org.openhab.core.io.transport.mqtt.MqttService;
+import org.openhab.core.thing.Bridge;
+import org.openhab.core.thing.binding.ThingHandlerCallback;
+
+/**
+ * Test cases for the {@link MQTTTopicDiscoveryService} service.
+ *
+ * @author David Graeff - Initial contribution
+ */
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.WARN)
+public class MQTTTopicDiscoveryServiceTest {
+ private ScheduledExecutorService scheduler;
+
+ private MqttBrokerHandlerFactory subject;
+
+ @Mock
+ private MqttService mqttService;
+
+ @Mock
+ private Bridge thing;
+
+ @Mock
+ private ThingHandlerCallback callback;
+
+ @Mock
+ MQTTTopicDiscoveryParticipant listener;
+
+ private MqttBrokerConnectionEx connection;
+
+ private BrokerHandler handler;
+
+ @BeforeEach
+ public void setUp() {
+ scheduler = new ScheduledThreadPoolExecutor(1);
+
+ when(thing.getUID()).thenReturn(MqttThingID.getThingUID("10.10.0.10", 80));
+ connection = spy(new MqttBrokerConnectionEx("10.10.0.10", 80, false, "BrokerHandlerTest"));
+ connection.setTimeoutExecutor(scheduler, 10);
+ connection.setConnectionCallback(connection);
+
+ Configuration config = new Configuration();
+ config.put("host", "10.10.0.10");
+ config.put("port", 80);
+ when(thing.getConfiguration()).thenReturn(config);
+
+ handler = spy(new BrokerHandlerEx(thing, connection));
+ handler.setCallback(callback);
+
+ subject = new MqttBrokerHandlerFactory(mqttService);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ scheduler.shutdownNow();
+ }
+
+ @Test
+ public void firstSubscribeThenHandler() {
+ handler.initialize();
+ BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
+
+ subject.subscribe(listener, "topic");
+ subject.createdHandler(handler);
+ assertThat(subject.discoveryTopics.get("topic"), hasItem(listener));
+ // Simulate receiving
+ final byte[] bytes = "TEST".getBytes();
+ connection.getSubscribers().get("topic").messageArrived("topic", bytes, false);
+ verify(listener).receivedMessage(eq(thing.getUID()), eq(connection), eq("topic"), eq(bytes));
+ }
+
+ @Test
+ public void firstHandlerThenSubscribe() {
+ handler.initialize();
+ BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
+
+ subject.createdHandler(handler);
+ subject.subscribe(listener, "topic");
+ assertThat(subject.discoveryTopics.get("topic"), hasItem(listener));
+
+ // Simulate receiving
+ final byte[] bytes = "TEST".getBytes();
+ connection.getSubscribers().get("topic").messageArrived("topic", bytes, false);
+ verify(listener).receivedMessage(eq(thing.getUID()), eq(connection), eq("topic"), eq(bytes));
+ }
+
+ @Test
+ public void handlerInitializeAfterSubscribe() {
+ subject.createdHandler(handler);
+ subject.subscribe(listener, "topic");
+ assertThat(subject.discoveryTopics.get("topic"), hasItem(listener));
+
+ // Init handler -> create connection
+ handler.initialize();
+ BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
+
+ // Simulate receiving
+ final byte[] bytes = "TEST".getBytes();
+ connection.getSubscribers().get("topic").messageArrived("topic", bytes, false);
+ verify(listener).receivedMessage(eq(thing.getUID()), eq(connection), eq("topic"), eq(bytes));
+ }
+
+ @Test
+ public void topicVanished() {
+ handler.initialize();
+ BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
+
+ subject.createdHandler(handler);
+ subject.subscribe(listener, "topic");
+ assertThat(subject.discoveryTopics.get("topic"), hasItem(listener));
+
+ // Simulate receiving
+ final byte[] bytes = "".getBytes();
+ connection.getSubscribers().get("topic").messageArrived("topic", bytes, false);
+ verify(listener).topicVanished(eq(thing.getUID()), eq(connection), eq("topic"));
+ }
+}
+++ /dev/null
-/**
- * Copyright (c) 2010-2020 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.binding.mqtt.internal;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.*;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import org.openhab.core.config.core.Configuration;
-import org.openhab.core.thing.Bridge;
-import org.openhab.core.thing.binding.ThingHandlerCallback;
-import org.openhab.core.io.transport.mqtt.MqttException;
-import org.openhab.core.io.transport.mqtt.MqttService;
-import org.openhab.core.io.transport.mqtt.internal.Subscription;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryParticipant;
-import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryService;
-import org.openhab.binding.mqtt.handler.BrokerHandler;
-import org.openhab.binding.mqtt.handler.BrokerHandlerEx;
-import org.openhab.binding.mqtt.handler.MqttBrokerConnectionEx;
-import org.osgi.service.cm.ConfigurationException;
-
-/**
- * Test cases for the {@link MQTTTopicDiscoveryService} service.
- *
- * @author David Graeff - Initial contribution
- */
-public class MQTTTopicDiscoveryServiceTest {
- private ScheduledExecutorService scheduler;
-
- private MqttBrokerHandlerFactory subject;
-
- @Mock
- private MqttService mqttService;
-
- @Mock
- private Bridge thing;
-
- @Mock
- private ThingHandlerCallback callback;
-
- @Mock
- MQTTTopicDiscoveryParticipant listener;
-
- private MqttBrokerConnectionEx connection;
-
- private BrokerHandler handler;
-
- @Before
- public void setUp() throws ConfigurationException, MqttException {
- scheduler = new ScheduledThreadPoolExecutor(1);
- MockitoAnnotations.initMocks(this);
-
- when(thing.getUID()).thenReturn(MqttThingID.getThingUID("10.10.0.10", 80));
- connection = spy(new MqttBrokerConnectionEx("10.10.0.10", 80, false, "BrokerHandlerTest"));
- connection.setTimeoutExecutor(scheduler, 10);
- connection.setConnectionCallback(connection);
-
- Configuration config = new Configuration();
- config.put("host", "10.10.0.10");
- config.put("port", 80);
- when(thing.getConfiguration()).thenReturn(config);
-
- handler = spy(new BrokerHandlerEx(thing, connection));
- handler.setCallback(callback);
-
- subject = new MqttBrokerHandlerFactory(mqttService);
- }
-
- @After
- public void tearDown() {
- scheduler.shutdownNow();
- }
-
- @Test
- public void firstSubscribeThenHandler() {
- handler.initialize();
- BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
-
- subject.subscribe(listener, "topic");
- subject.createdHandler(handler);
- assertTrue(subject.discoveryTopics.get("topic").contains(listener));
- // Simulate receiving
- final byte[] bytes = "TEST".getBytes();
- connection.getSubscribers().get("topic").forEach(s -> s.processMessage("topic", bytes));
- verify(listener).receivedMessage(eq(thing.getUID()), eq(connection), eq("topic"), eq(bytes));
- }
-
- @Test
- public void firstHandlerThenSubscribe() {
- handler.initialize();
- BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
-
- subject.createdHandler(handler);
- subject.subscribe(listener, "topic");
- assertTrue(subject.discoveryTopics.get("topic").contains(listener));
-
- // Simulate receiving
- final byte[] bytes = "TEST".getBytes();
- connection.getSubscribers().get("topic").forEach(s -> s.processMessage("topic", bytes));
- verify(listener).receivedMessage(eq(thing.getUID()), eq(connection), eq("topic"), eq(bytes));
- }
-
- @Test
- public void handlerInitializeAfterSubscribe() {
- subject.createdHandler(handler);
- subject.subscribe(listener, "topic");
- assertTrue(subject.discoveryTopics.get("topic").contains(listener));
-
- // Init handler -> create connection
- handler.initialize();
- BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
-
- // Simulate receiving
- final byte[] bytes = "TEST".getBytes();
-
- connection.getSubscribers().getOrDefault("topic", new Subscription("topic"))
- .forEach(s -> s.processMessage("topic", bytes));
- verify(listener).receivedMessage(eq(thing.getUID()), eq(connection), eq("topic"), eq(bytes));
- }
-
- @Test
- public void topicVanished() {
- handler.initialize();
- BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
-
- subject.createdHandler(handler);
- subject.subscribe(listener, "topic");
- assertTrue(subject.discoveryTopics.get("topic").contains(listener));
-
- // Simulate receiving
- final byte[] bytes = "".getBytes();
- connection.getSubscribers().getOrDefault("topic", new Subscription("topic"))
- .forEach(s -> s.processMessage("topic", bytes));
- verify(listener).topicVanished(eq(thing.getUID()), eq(connection), eq("topic"));
- }
-}
import java.util.Map;
import java.util.TreeMap;
-import javax.naming.ConfigurationException;
-
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
private @Mock DiscoveryListener discoverListener;
@BeforeEach
- public void initMocks() throws ConfigurationException {
+ public void initMocks() {
Map<String, MqttBrokerConnection> brokers = new TreeMap<>();
brokers.put("testname", new MqttBrokerConnection("tcp://123.123.123.123", null, false, null));
brokers.put("textual", new MqttBrokerConnection("tcp://123.123.123.123", null, true, null));
}
@Test
- public void testDiscovery() throws ConfigurationException {
+ public void testDiscovery() {
// Setting the MqttService will enable the background scanner
MqttServiceDiscoveryService d = new MqttServiceDiscoveryService();
d.addDiscoveryListener(discoverListener);