]> git.basschouten.com Git - openhab-addons.git/commitdiff
Fix parallel MQTT itests execution (#8617)
authorWouter Born <github@maindrain.net>
Wed, 30 Sep 2020 17:36:47 +0000 (19:36 +0200)
committerGitHub <noreply@github.com>
Wed, 30 Sep 2020 17:36:47 +0000 (19:36 +0200)
* Improve exception handling of the embedded MQTT broker so the port can be reconfigured when it is already bound and it properly unlocks files
* Rework MQTT integration tests so they each run the embedded broker on their own reserved port

Signed-off-by: Wouter Born <github@maindrain.net>
15 files changed:
bundles/org.openhab.io.mqttembeddedbroker/pom.xml
bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/Constants.java
bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/internal/EmbeddedBrokerService.java
itests/org.openhab.binding.mqtt.homeassistant.tests/itest.bndrun
itests/org.openhab.binding.mqtt.homeassistant.tests/pom.xml
itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java
itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/HomeAssistantMQTTImplementationTest.java
itests/org.openhab.binding.mqtt.homie.tests/itest.bndrun
itests/org.openhab.binding.mqtt.homie.tests/pom.xml
itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java
itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/HomieImplementationTest.java
itests/org.openhab.io.mqttembeddedbroker.tests/itest.bndrun
itests/org.openhab.io.mqttembeddedbroker.tests/pom.xml
itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/EmbeddedBrokerTools.java
itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/MoquetteTest.java

index ae9f42ed16930c9132892b03e0f7d0f1ac786596..d733a11549faf3543d8cc20885cb0abc6ec66eb3 100644 (file)
     </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>reserve-network-port</id>
+            <goals>
+              <goal>reserve-network-port</goal>
+            </goals>
+            <phase>process-resources</phase>
+            <configuration>
+              <portNames>
+                <portName>mqttembeddedbroker.port</portName>
+              </portNames>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
index 9b888f8c6e8f6c16252c7c8dad8b1ead3a44112d..34b85e59082c7269d33d567bad124e967a58ac3d 100644 (file)
@@ -26,4 +26,14 @@ public class Constants {
      * </pre>
      */
     public static final String CLIENTID = "embedded-mqtt-broker";
+
+    /**
+     * The broker persistent identifier used for identifying configurations.
+     */
+    public static final String PID = "org.openhab.core.mqttembeddedbroker";
+
+    /**
+     * The configuration key used for configuring the embedded broker port.
+     */
+    public static final String PORT = "port";
 }
index bbdc533206fe3d86999ee2b57ed0e52c4ae7d023..a87cada201505a6c64b059c5d56b603c37505253 100644 (file)
@@ -79,8 +79,8 @@ import io.netty.handler.ssl.SslContextBuilder;
  *
  * @author David Graeff - Initial contribution
  */
-@Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = "org.openhab.core.mqttembeddedbroker", //
-        property = org.osgi.framework.Constants.SERVICE_PID + "=org.openhab.core.mqttembeddedbroker")
+@Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = Constants.PID, //
+        property = org.osgi.framework.Constants.SERVICE_PID + "=" + Constants.PID)
 @ConfigurableService(category = "MQTT", label = "MQTT Embedded Broker", description_uri = "mqtt:mqttembeddedbroker")
 @NonNullByDefault
 public class EmbeddedBrokerService
@@ -309,7 +309,12 @@ public class EmbeddedBrokerService
                 // retry starting broker, if it fails again, don't catch exception
                 server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
             }
+        } catch (Exception e) {
+            logger.warn("Failed to start embedded MQTT server: {}", e.getMessage());
+            server.stopServer();
+            return;
         }
+
         this.server = server;
         server.addInterceptHandler(metrics);
         ScheduledExecutorService s = new ScheduledThreadPoolExecutor(1);
index bafc464171d4a169183a4ac4897a56f2962467ea..49b1796db1077f79b898de58b34b9533bdf5a685 100644 (file)
@@ -13,6 +13,10 @@ Fragment-Host: org.openhab.binding.mqtt.homeassistant
 -runblacklist: \
        bnd.identity;id='org.openhab.core.storage.json'
 
+-runvm: \
+       -Dio.netty.noUnsafe=true,\
+       -Dmqttembeddedbroker.port=${mqttembeddedbroker.port}
+
 #
 # done
 #
@@ -87,7 +91,4 @@ Fragment-Host: org.openhab.binding.mqtt.homeassistant
        org.openhab.core.transform;version='[3.0.0,3.0.1)',\
        org.openhab.io.mqttembeddedbroker;version='[3.0.0,3.0.1)',\
        org.opentest4j;version='[1.2.0,1.2.1)',\
-       org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\
-       moquette-broker;version='[0.13.0,0.13.1)'
-
--runvm: -Dio.netty.noUnsafe=true
+       org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)'
index 110ca397a2afe48fb84d1a1c81f9f7e2016044ae..2c8e11c8a84556e5dd5a05286cfe16ffe6fa51e2 100644 (file)
     </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>reserve-network-port</id>
+            <goals>
+              <goal>reserve-network-port</goal>
+            </goals>
+            <phase>process-resources</phase>
+            <configuration>
+              <portNames>
+                <portName>mqttembeddedbroker.port</portName>
+              </portNames>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
index c318ed535d17c2f64809e09ba37c53e550ba49f5..de7e511ade1315c58aa35cb7c1aef12fcf0caf6b 100644 (file)
@@ -14,6 +14,9 @@ package org.openhab.binding.mqtt;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -25,23 +28,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionState;
 import org.openhab.core.io.transport.mqtt.MqttService;
 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
 import org.openhab.io.mqttembeddedbroker.Constants;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
 
 /**
  * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
  * tree.
  *
  * @author David Graeff - Initial contribution
+ * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
  */
 @NonNullByDefault
 public class EmbeddedBrokerTools {
-    public @Nullable MqttBrokerConnection embeddedConnection = null;
+
+    private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
+
+    private final ConfigurationAdmin configurationAdmin;
+    private final MqttService mqttService;
+
+    public @Nullable MqttBrokerConnection embeddedConnection;
+
+    public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
+        this.configurationAdmin = configurationAdmin;
+        this.mqttService = mqttService;
+    }
 
     /**
      * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
      *
      * @throws InterruptedException
+     * @throws IOException
      */
-    public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException {
+    public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
+        reconfigurePort();
+
         embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
         if (embeddedConnection == null) {
             Semaphore semaphore = new Semaphore(1);
@@ -61,7 +81,7 @@ public class EmbeddedBrokerTools {
                 }
             };
             mqttService.addBrokersListener(observer);
-            assertTrue(semaphore.tryAcquire(1000, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed");
+            assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
         }
         MqttBrokerConnection embeddedConnection = this.embeddedConnection;
         if (embeddedConnection == null) {
@@ -79,8 +99,25 @@ public class EmbeddedBrokerTools {
         if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
             semaphore.release();
         }
-        assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId()
+        assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
                 + " failed. State: " + embeddedConnection.connectionState());
         return embeddedConnection;
     }
+
+    public void reconfigurePort() throws IOException {
+        Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
+
+        Dictionary<String, Object> properties = configuration.getProperties();
+        if (properties == null) {
+            properties = new Hashtable<>();
+        }
+
+        Integer currentPort = (Integer) properties.get(Constants.PORT);
+        if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
+            properties.put(Constants.PORT, BROKER_PORT);
+            configuration.update(properties);
+            // Remove the connection to make sure the test waits for the new connection to become available
+            mqttService.removeBrokerConnection(Constants.CLIENTID);
+        }
+    }
 }
index a4ae5fc8d4c2ba91211202609f1c9b89ff271607..96aaf90bfe02ff767797441c47da1e66644d3177 100644 (file)
@@ -57,6 +57,7 @@ import org.openhab.core.test.java.JavaOSGiTest;
 import org.openhab.core.types.State;
 import org.openhab.core.types.UnDefType;
 import org.openhab.core.util.UIDUtils;
+import org.osgi.service.cm.ConfigurationAdmin;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -69,6 +70,7 @@ import com.google.gson.GsonBuilder;
  */
 @NonNullByDefault
 public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
+    private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
     private @NonNullByDefault({}) MqttService mqttService;
     private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
     private @NonNullByDefault({}) MqttBrokerConnection connection;
@@ -94,14 +96,15 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
     public void beforeEach() throws Exception {
         registerVolatileStorageService();
         mocksCloseable = openMocks(this);
+        configurationAdmin = getService(ConfigurationAdmin.class);
         mqttService = getService(MqttService.class);
 
         // Wait for the EmbeddedBrokerService internal connection to be connected
-        embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
+        embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
 
         connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
                 embeddedConnection.isSecure(), "ha_mqtt");
-        connection.start().get(1000, TimeUnit.MILLISECONDS);
+        connection.start().get(2, TimeUnit.SECONDS);
         assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
 
         // If the connection state changes in between -> fail
@@ -117,7 +120,7 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
         futures.add(embeddedConnection.publish(testObjectTopic + "/state", "ON".getBytes(), 0, true));
 
         registeredTopics = futures.size();
-        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(2, TimeUnit.SECONDS);
 
         failure = null;
 
@@ -128,7 +131,7 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
     public void afterEach() throws Exception {
         if (connection != null) {
             connection.removeConnectionObserver(failIfChange);
-            connection.stop().get(1000, TimeUnit.MILLISECONDS);
+            connection.stop().get(2, TimeUnit.SECONDS);
         }
 
         mocksCloseable.close();
@@ -137,18 +140,18 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
     @Test
     public void reconnectTest() throws InterruptedException, ExecutionException, TimeoutException {
         connection.removeConnectionObserver(failIfChange);
-        connection.stop().get(2000, TimeUnit.MILLISECONDS);
+        connection.stop().get(2, TimeUnit.SECONDS);
         connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
                 embeddedConnection.isSecure(), "ha_mqtt");
-        connection.start().get(2000, TimeUnit.MILLISECONDS);
+        connection.start().get(2, TimeUnit.SECONDS);
     }
 
     @Test
     public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
         CountDownLatch c = new CountDownLatch(registeredTopics);
         connection.subscribe("homeassistant/+/+/" + ThingChannelConstants.testHomeAssistantThing.getId() + "/#",
-                (topic, payload) -> c.countDown()).get(1000, TimeUnit.MILLISECONDS);
-        assertTrue(c.await(1000, TimeUnit.MILLISECONDS),
+                (topic, payload) -> c.countDown()).get(2, TimeUnit.SECONDS);
+        assertTrue(c.await(2, TimeUnit.SECONDS),
                 "Connection " + connection.getClientId() + " not retrieving all topics");
     }
 
@@ -183,8 +186,8 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
                     return null;
                 });
 
-        assertTrue(latch.await(4000, TimeUnit.MILLISECONDS));
-        future.get(2000, TimeUnit.MILLISECONDS);
+        assertTrue(latch.await(4, TimeUnit.SECONDS));
+        future.get(2, TimeUnit.SECONDS);
 
         // No failure expected and one discovered result
         assertNull(failure);
index e3be5b0f16df06dccc4476272e123b6c73a66b72..8d9ecadab7d97385580b6ea93429028611bdd9b9 100644 (file)
@@ -13,6 +13,10 @@ Fragment-Host: org.openhab.binding.mqtt.homie
 -runblacklist: \
        bnd.identity;id='org.openhab.core.storage.json'
 
+-runvm: \
+       -Dio.netty.noUnsafe=true,\
+       -Dmqttembeddedbroker.port=${mqttembeddedbroker.port}
+
 #
 # done
 #
@@ -89,4 +93,4 @@ Fragment-Host: org.openhab.binding.mqtt.homie
        org.opentest4j;version='[1.2.0,1.2.1)',\
        org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\
        moquette-broker;version='[0.13.0,0.13.1)'
--runvm: -Dio.netty.noUnsafe=true
+
index 90f6b77ece5ad47345d2bd47068cced83c1d5d8d..d6965a09999c9696dfa284538641875c36e29172 100644 (file)
@@ -82,7 +82,6 @@
       <groupId>com.h2database</groupId>
       <artifactId>h2-mvstore</artifactId>
       <version>1.4.199</version>
-
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
     </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>reserve-network-port</id>
+            <goals>
+              <goal>reserve-network-port</goal>
+            </goals>
+            <phase>process-resources</phase>
+            <configuration>
+              <portNames>
+                <portName>mqttembeddedbroker.port</portName>
+              </portNames>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
index ec69238753ff889f024d6e8348c29e0b4244b2b2..de7e511ade1315c58aa35cb7c1aef12fcf0caf6b 100644 (file)
@@ -14,6 +14,9 @@ package org.openhab.binding.mqtt;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -25,26 +28,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionState;
 import org.openhab.core.io.transport.mqtt.MqttService;
 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
 import org.openhab.io.mqttembeddedbroker.Constants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
 
 /**
  * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
  * tree.
  *
  * @author David Graeff - Initial contribution
+ * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
  */
 @NonNullByDefault
 public class EmbeddedBrokerTools {
-    private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerTools.class);
-    public @Nullable MqttBrokerConnection embeddedConnection = null;
+
+    private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
+
+    private final ConfigurationAdmin configurationAdmin;
+    private final MqttService mqttService;
+
+    public @Nullable MqttBrokerConnection embeddedConnection;
+
+    public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
+        this.configurationAdmin = configurationAdmin;
+        this.mqttService = mqttService;
+    }
 
     /**
      * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
      *
      * @throws InterruptedException
+     * @throws IOException
      */
-    public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException {
+    public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
+        reconfigurePort();
+
         embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
         if (embeddedConnection == null) {
             Semaphore semaphore = new Semaphore(1);
@@ -64,14 +81,13 @@ public class EmbeddedBrokerTools {
                 }
             };
             mqttService.addBrokersListener(observer);
-            assertTrue(semaphore.tryAcquire(700, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed");
+            assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
         }
         MqttBrokerConnection embeddedConnection = this.embeddedConnection;
         if (embeddedConnection == null) {
             throw new IllegalStateException();
         }
 
-        logger.warn("waitForConnection {}", embeddedConnection.connectionState());
         Semaphore semaphore = new Semaphore(1);
         semaphore.acquire();
         MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
@@ -83,8 +99,25 @@ public class EmbeddedBrokerTools {
         if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
             semaphore.release();
         }
-        assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId()
+        assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
                 + " failed. State: " + embeddedConnection.connectionState());
         return embeddedConnection;
     }
+
+    public void reconfigurePort() throws IOException {
+        Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
+
+        Dictionary<String, Object> properties = configuration.getProperties();
+        if (properties == null) {
+            properties = new Hashtable<>();
+        }
+
+        Integer currentPort = (Integer) properties.get(Constants.PORT);
+        if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
+            properties.put(Constants.PORT, BROKER_PORT);
+            configuration.update(properties);
+            // Remove the connection to make sure the test waits for the new connection to become available
+            mqttService.removeBrokerConnection(Constants.CLIENTID);
+        }
+    }
 }
index 776f8fb91f852a94ee4abd9e4ee394323228ad66..816ec3b75880073c171f7ef50d23c076df6178bb 100644 (file)
@@ -59,6 +59,7 @@ import org.openhab.core.library.types.DecimalType;
 import org.openhab.core.library.types.OnOffType;
 import org.openhab.core.test.java.JavaOSGiTest;
 import org.openhab.core.types.UnDefType;
+import org.osgi.service.cm.ConfigurationAdmin;
 
 /**
  * A full implementation test, that starts the embedded MQTT broker and publishes a homie device tree.
@@ -71,6 +72,7 @@ public class HomieImplementationTest extends JavaOSGiTest {
     private static final String DEVICE_ID = ThingChannelConstants.testHomieThing.getId();
     private static final String DEVICE_TOPIC = BASE_TOPIC + "/" + DEVICE_ID;
 
+    private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
     private @NonNullByDefault({}) MqttService mqttService;
     private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
     private @NonNullByDefault({}) MqttBrokerConnection connection;
@@ -99,15 +101,17 @@ public class HomieImplementationTest extends JavaOSGiTest {
     public void beforeEach() throws Exception {
         registerVolatileStorageService();
         mocksCloseable = openMocks(this);
+        configurationAdmin = getService(ConfigurationAdmin.class);
         mqttService = getService(MqttService.class);
 
-        embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
+        // Wait for the EmbeddedBrokerService internal connection to be connected
+        embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
         embeddedConnection.setQos(1);
 
         connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
                 embeddedConnection.isSecure(), "homie");
         connection.setQos(1);
-        connection.start().get(500, TimeUnit.MILLISECONDS);
+        connection.start().get(5, TimeUnit.SECONDS);
         assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
         // If the connection state changes in between -> fail
         connection.addConnectionObserver(failIfChange);
@@ -146,7 +150,7 @@ public class HomieImplementationTest extends JavaOSGiTest {
         futures.add(publish(propertyTestTopic + "/$datatype", "boolean"));
 
         registeredTopics = futures.size();
-        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(2, TimeUnit.SECONDS);
 
         scheduler = new ScheduledThreadPoolExecutor(6);
     }
@@ -159,9 +163,11 @@ public class HomieImplementationTest extends JavaOSGiTest {
     public void afterEach() throws Exception {
         if (connection != null) {
             connection.removeConnectionObserver(failIfChange);
-            connection.stop().get(500, TimeUnit.MILLISECONDS);
+            connection.stop().get(2, TimeUnit.SECONDS);
+        }
+        if (scheduler != null) {
+            scheduler.shutdownNow();
         }
-        scheduler.shutdownNow();
         mocksCloseable.close();
     }
 
@@ -169,9 +175,8 @@ public class HomieImplementationTest extends JavaOSGiTest {
     public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
         // four topics are not under /testnode !
         CountDownLatch c = new CountDownLatch(registeredTopics - 4);
-        connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5000,
-                TimeUnit.MILLISECONDS);
-        assertTrue(c.await(5000, TimeUnit.MILLISECONDS),
+        connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS);
+        assertTrue(c.await(5, TimeUnit.SECONDS),
                 "Connection " + connection.getClientId() + " not retrieving all topics ");
     }
 
index 82764488e874783cd7e4b9377fc7c452f1a4b348..e690bf692d5a1ea58392ff7c22c2475f847f50ff 100644 (file)
@@ -10,6 +10,10 @@ Fragment-Host: org.openhab.io.mqttembeddedbroker
 -runblacklist: \
        bnd.identity;id='org.openhab.core.storage.json'
 
+-runvm: \
+       -Dio.netty.noUnsafe=true,\
+       -Dmqttembeddedbroker.port=${mqttembeddedbroker.port}
+
 #
 # done
 #
@@ -73,4 +77,3 @@ Fragment-Host: org.openhab.io.mqttembeddedbroker
        org.opentest4j;version='[1.2.0,1.2.1)',\
        org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\
        moquette-broker;version='[0.13.0,0.13.1)'
--runvm: -Dio.netty.noUnsafe=true
index ac22da3434cbfb7488afe52682e9b542602664a4..3b7d7792d28152c63d88e9f71715ea2f86e2a545 100644 (file)
     </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>reserve-network-port</id>
+            <goals>
+              <goal>reserve-network-port</goal>
+            </goals>
+            <phase>process-resources</phase>
+            <configuration>
+              <portNames>
+                <portName>mqttembeddedbroker.port</portName>
+              </portNames>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
index cc41838a06323ad433d7c0848b8bcf71fe960945..9ebdf503cb1fdf2ae9a49de6871218c81573d471 100644 (file)
@@ -14,6 +14,9 @@ package org.openhab.io.mqttembeddedbroker;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -24,26 +27,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
 import org.openhab.core.io.transport.mqtt.MqttService;
 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
 
 /**
  * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
  * tree.
  *
  * @author David Graeff - Initial contribution
+ * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
  */
 @NonNullByDefault
 public class EmbeddedBrokerTools {
-    private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerTools.class);
-    public @Nullable MqttBrokerConnection embeddedConnection = null;
+
+    private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
+
+    private final ConfigurationAdmin configurationAdmin;
+    private final MqttService mqttService;
+
+    public @Nullable MqttBrokerConnection embeddedConnection;
+
+    public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
+        this.configurationAdmin = configurationAdmin;
+        this.mqttService = mqttService;
+    }
 
     /**
      * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
      *
      * @throws InterruptedException
+     * @throws IOException
      */
-    public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException {
+    public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
+        reconfigurePort();
+
         embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
         if (embeddedConnection == null) {
             Semaphore semaphore = new Semaphore(1);
@@ -63,14 +80,13 @@ public class EmbeddedBrokerTools {
                 }
             };
             mqttService.addBrokersListener(observer);
-            assertTrue(semaphore.tryAcquire(700, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed");
+            assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
         }
         MqttBrokerConnection embeddedConnection = this.embeddedConnection;
         if (embeddedConnection == null) {
             throw new IllegalStateException();
         }
 
-        logger.warn("waitForConnection {}", embeddedConnection.connectionState());
         Semaphore semaphore = new Semaphore(1);
         semaphore.acquire();
         MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
@@ -82,8 +98,25 @@ public class EmbeddedBrokerTools {
         if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
             semaphore.release();
         }
-        assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId()
+        assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
                 + " failed. State: " + embeddedConnection.connectionState());
         return embeddedConnection;
     }
+
+    public void reconfigurePort() throws IOException {
+        Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
+
+        Dictionary<String, Object> properties = configuration.getProperties();
+        if (properties == null) {
+            properties = new Hashtable<>();
+        }
+
+        Integer currentPort = (Integer) properties.get(Constants.PORT);
+        if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
+            properties.put(Constants.PORT, BROKER_PORT);
+            configuration.update(properties);
+            // Remove the connection to make sure the test waits for the new connection to become available
+            mqttService.removeBrokerConnection(Constants.CLIENTID);
+        }
+    }
 }
index f15a4245e11d090635c36fbb568e4daca05662a7..a22a69477681dcb113c3859d6fd891abfd1dceb4 100644 (file)
@@ -35,6 +35,7 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
 import org.openhab.core.io.transport.mqtt.MqttService;
 import org.openhab.core.test.java.JavaOSGiTest;
+import org.osgi.service.cm.ConfigurationAdmin;
 
 /**
  * Moquette test
@@ -47,6 +48,7 @@ public class MoquetteTest extends JavaOSGiTest {
 
     private @NonNullByDefault({}) AutoCloseable mocksCloseable;
 
+    private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
     private @NonNullByDefault({}) MqttService mqttService;
     private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
     private @NonNullByDefault({}) MqttBrokerConnection clientConnection;
@@ -62,10 +64,11 @@ public class MoquetteTest extends JavaOSGiTest {
     public void beforeEach() throws Exception {
         registerVolatileStorageService();
         mocksCloseable = openMocks(this);
+        configurationAdmin = getService(ConfigurationAdmin.class);
         mqttService = getService(MqttService.class);
 
         // Wait for the EmbeddedBrokerService internal connection to be connected
-        embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
+        embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
         embeddedConnection.setQos(1);
 
         clientConnection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),