2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt;
15 import static org.junit.jupiter.api.Assertions.assertTrue;
17 import java.io.IOException;
18 import java.util.Dictionary;
19 import java.util.Hashtable;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.TimeUnit;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
26 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
27 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
28 import org.openhab.core.io.transport.mqtt.MqttService;
29 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
30 import org.osgi.service.cm.Configuration;
31 import org.osgi.service.cm.ConfigurationAdmin;
34 * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
37 * @author David Graeff - Initial contribution
38 * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
41 public class EmbeddedBrokerTools {
43 private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
45 private final ConfigurationAdmin configurationAdmin;
46 private final MqttService mqttService;
48 public @Nullable MqttBrokerConnection embeddedConnection;
50 public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
51 this.configurationAdmin = configurationAdmin;
52 this.mqttService = mqttService;
56 * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
58 * @throws InterruptedException
61 public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
64 embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
65 if (embeddedConnection == null) {
66 Semaphore semaphore = new Semaphore(1);
68 MqttServiceObserver observer = new MqttServiceObserver() {
71 public void brokerAdded(String brokerID, MqttBrokerConnection broker) {
72 if (brokerID.equals(Constants.CLIENTID)) {
73 embeddedConnection = broker;
79 public void brokerRemoved(String brokerID, MqttBrokerConnection broker) {
82 mqttService.addBrokersListener(observer);
83 assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
85 MqttBrokerConnection embeddedConnection = this.embeddedConnection;
86 if (embeddedConnection == null) {
87 throw new IllegalStateException();
90 Semaphore semaphore = new Semaphore(1);
92 MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
93 if (state == MqttConnectionState.CONNECTED) {
97 embeddedConnection.addConnectionObserver(mqttConnectionObserver);
98 if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
101 assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
102 + " failed. State: " + embeddedConnection.connectionState());
103 return embeddedConnection;
106 public void reconfigurePort() throws IOException {
107 Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
109 Dictionary<String, Object> properties = configuration.getProperties();
110 if (properties == null) {
111 properties = new Hashtable<>();
114 Integer currentPort = (Integer) properties.get(Constants.PORT);
115 if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
116 properties.put(Constants.PORT, BROKER_PORT);
117 configuration.update(properties);
118 // Remove the connection to make sure the test waits for the new connection to become available
119 mqttService.removeBrokerConnection(Constants.CLIENTID);