]> git.basschouten.com Git - openhab-addons.git/blob
f7e10c4681d45c7c1df529ab39fea5ddeb9b47d4
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt;
14
15 import static org.junit.jupiter.api.Assertions.assertTrue;
16
17 import java.io.IOException;
18 import java.util.Dictionary;
19 import java.util.Hashtable;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.TimeUnit;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
26 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
27 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
28 import org.openhab.core.io.transport.mqtt.MqttService;
29 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
30 import org.osgi.service.cm.Configuration;
31 import org.osgi.service.cm.ConfigurationAdmin;
32
33 /**
34  * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
35  * tree.
36  *
37  * @author David Graeff - Initial contribution
38  * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
39  */
40 @NonNullByDefault
41 public class EmbeddedBrokerTools {
42
43     private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
44
45     private final ConfigurationAdmin configurationAdmin;
46     private final MqttService mqttService;
47
48     public @Nullable MqttBrokerConnection embeddedConnection;
49
50     public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
51         this.configurationAdmin = configurationAdmin;
52         this.mqttService = mqttService;
53     }
54
55     /**
56      * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
57      *
58      * @throws InterruptedException
59      * @throws IOException
60      */
61     public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
62         reconfigurePort();
63
64         embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
65         if (embeddedConnection == null) {
66             Semaphore semaphore = new Semaphore(1);
67             semaphore.acquire();
68             MqttServiceObserver observer = new MqttServiceObserver() {
69
70                 @Override
71                 public void brokerAdded(String brokerID, MqttBrokerConnection broker) {
72                     if (brokerID.equals(Constants.CLIENTID)) {
73                         embeddedConnection = broker;
74                         semaphore.release();
75                     }
76                 }
77
78                 @Override
79                 public void brokerRemoved(String brokerID, MqttBrokerConnection broker) {
80                 }
81             };
82             mqttService.addBrokersListener(observer);
83             assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
84         }
85         MqttBrokerConnection embeddedConnection = this.embeddedConnection;
86         if (embeddedConnection == null) {
87             throw new IllegalStateException();
88         }
89
90         Semaphore semaphore = new Semaphore(1);
91         semaphore.acquire();
92         MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
93             if (state == MqttConnectionState.CONNECTED) {
94                 semaphore.release();
95             }
96         };
97         embeddedConnection.addConnectionObserver(mqttConnectionObserver);
98         if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
99             semaphore.release();
100         }
101         assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
102                 + " failed. State: " + embeddedConnection.connectionState());
103         return embeddedConnection;
104     }
105
106     public void reconfigurePort() throws IOException {
107         Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
108
109         Dictionary<String, Object> properties = configuration.getProperties();
110         if (properties == null) {
111             properties = new Hashtable<>();
112         }
113
114         Integer currentPort = (Integer) properties.get(Constants.PORT);
115         if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
116             properties.put(Constants.PORT, BROKER_PORT);
117             configuration.update(properties);
118             // Remove the connection to make sure the test waits for the new connection to become available
119             mqttService.removeBrokerConnection(Constants.CLIENTID);
120         }
121     }
122 }