]> git.basschouten.com Git - openhab-addons.git/blob
de7e511ade1315c58aa35cb7c1aef12fcf0caf6b
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt;
14
15 import static org.junit.jupiter.api.Assertions.assertTrue;
16
17 import java.io.IOException;
18 import java.util.Dictionary;
19 import java.util.Hashtable;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.TimeUnit;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
26 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
27 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
28 import org.openhab.core.io.transport.mqtt.MqttService;
29 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
30 import org.openhab.io.mqttembeddedbroker.Constants;
31 import org.osgi.service.cm.Configuration;
32 import org.osgi.service.cm.ConfigurationAdmin;
33
34 /**
35  * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
36  * tree.
37  *
38  * @author David Graeff - Initial contribution
39  * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
40  */
41 @NonNullByDefault
42 public class EmbeddedBrokerTools {
43
44     private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
45
46     private final ConfigurationAdmin configurationAdmin;
47     private final MqttService mqttService;
48
49     public @Nullable MqttBrokerConnection embeddedConnection;
50
51     public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
52         this.configurationAdmin = configurationAdmin;
53         this.mqttService = mqttService;
54     }
55
56     /**
57      * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
58      *
59      * @throws InterruptedException
60      * @throws IOException
61      */
62     public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
63         reconfigurePort();
64
65         embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
66         if (embeddedConnection == null) {
67             Semaphore semaphore = new Semaphore(1);
68             semaphore.acquire();
69             MqttServiceObserver observer = new MqttServiceObserver() {
70
71                 @Override
72                 public void brokerAdded(String brokerID, MqttBrokerConnection broker) {
73                     if (brokerID.equals(Constants.CLIENTID)) {
74                         embeddedConnection = broker;
75                         semaphore.release();
76                     }
77                 }
78
79                 @Override
80                 public void brokerRemoved(String brokerID, MqttBrokerConnection broker) {
81                 }
82             };
83             mqttService.addBrokersListener(observer);
84             assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
85         }
86         MqttBrokerConnection embeddedConnection = this.embeddedConnection;
87         if (embeddedConnection == null) {
88             throw new IllegalStateException();
89         }
90
91         Semaphore semaphore = new Semaphore(1);
92         semaphore.acquire();
93         MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
94             if (state == MqttConnectionState.CONNECTED) {
95                 semaphore.release();
96             }
97         };
98         embeddedConnection.addConnectionObserver(mqttConnectionObserver);
99         if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
100             semaphore.release();
101         }
102         assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
103                 + " failed. State: " + embeddedConnection.connectionState());
104         return embeddedConnection;
105     }
106
107     public void reconfigurePort() throws IOException {
108         Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
109
110         Dictionary<String, Object> properties = configuration.getProperties();
111         if (properties == null) {
112             properties = new Hashtable<>();
113         }
114
115         Integer currentPort = (Integer) properties.get(Constants.PORT);
116         if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
117             properties.put(Constants.PORT, BROKER_PORT);
118             configuration.update(properties);
119             // Remove the connection to make sure the test waits for the new connection to become available
120             mqttService.removeBrokerConnection(Constants.CLIENTID);
121         }
122     }
123 }