]> git.basschouten.com Git - openhab-addons.git/blob
cc41838a06323ad433d7c0848b8bcf71fe960945
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.mqttembeddedbroker;
14
15 import static org.junit.jupiter.api.Assertions.assertTrue;
16
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.TimeUnit;
19
20 import org.eclipse.jdt.annotation.NonNullByDefault;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
23 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
24 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
25 import org.openhab.core.io.transport.mqtt.MqttService;
26 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
32  * tree.
33  *
34  * @author David Graeff - Initial contribution
35  */
36 @NonNullByDefault
37 public class EmbeddedBrokerTools {
38     private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerTools.class);
39     public @Nullable MqttBrokerConnection embeddedConnection = null;
40
41     /**
42      * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
43      *
44      * @throws InterruptedException
45      */
46     public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException {
47         embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
48         if (embeddedConnection == null) {
49             Semaphore semaphore = new Semaphore(1);
50             semaphore.acquire();
51             MqttServiceObserver observer = new MqttServiceObserver() {
52
53                 @Override
54                 public void brokerAdded(String brokerID, MqttBrokerConnection broker) {
55                     if (brokerID.equals(Constants.CLIENTID)) {
56                         embeddedConnection = broker;
57                         semaphore.release();
58                     }
59                 }
60
61                 @Override
62                 public void brokerRemoved(String brokerID, MqttBrokerConnection broker) {
63                 }
64             };
65             mqttService.addBrokersListener(observer);
66             assertTrue(semaphore.tryAcquire(700, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed");
67         }
68         MqttBrokerConnection embeddedConnection = this.embeddedConnection;
69         if (embeddedConnection == null) {
70             throw new IllegalStateException();
71         }
72
73         logger.warn("waitForConnection {}", embeddedConnection.connectionState());
74         Semaphore semaphore = new Semaphore(1);
75         semaphore.acquire();
76         MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
77             if (state == MqttConnectionState.CONNECTED) {
78                 semaphore.release();
79             }
80         };
81         embeddedConnection.addConnectionObserver(mqttConnectionObserver);
82         if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
83             semaphore.release();
84         }
85         assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId()
86                 + " failed. State: " + embeddedConnection.connectionState());
87         return embeddedConnection;
88     }
89 }