]> git.basschouten.com Git - openhab-addons.git/blob
736e91c80661ee2bb1779267431d90d07be40f74
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.mqttembeddedbroker;
14
15 import static org.junit.jupiter.api.Assertions.assertTrue;
16
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.TimeUnit;
19
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
24 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
25 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
26 import org.openhab.core.io.transport.mqtt.MqttService;
27 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
33  * tree.
34  *
35  * @author David Graeff - Initial contribution
36  */
37 @NonNullByDefault
38 public class EmbeddedBrokerTools {
39     private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerTools.class);
40     public @Nullable MqttBrokerConnection embeddedConnection = null;
41
42     /**
43      * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
44      *
45      * @throws InterruptedException
46      */
47     public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException {
48         embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
49         if (embeddedConnection == null) {
50             Semaphore semaphore = new Semaphore(1);
51             semaphore.acquire();
52             MqttServiceObserver observer = new MqttServiceObserver() {
53
54                 @Override
55                 public void brokerAdded(@NonNull String brokerID, @NonNull MqttBrokerConnection broker) {
56                     if (brokerID.equals(Constants.CLIENTID)) {
57                         embeddedConnection = broker;
58                         semaphore.release();
59                     }
60                 }
61
62                 @Override
63                 public void brokerRemoved(@NonNull String brokerID, @NonNull MqttBrokerConnection broker) {
64                 }
65             };
66             mqttService.addBrokersListener(observer);
67             assertTrue(semaphore.tryAcquire(700, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed");
68         }
69         MqttBrokerConnection embeddedConnection = this.embeddedConnection;
70         if (embeddedConnection == null) {
71             throw new IllegalStateException();
72         }
73
74         logger.warn("waitForConnection {}", embeddedConnection.connectionState());
75         Semaphore semaphore = new Semaphore(1);
76         semaphore.acquire();
77         MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
78             if (state == MqttConnectionState.CONNECTED) {
79                 semaphore.release();
80             }
81         };
82         embeddedConnection.addConnectionObserver(mqttConnectionObserver);
83         if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
84             semaphore.release();
85         }
86         assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId()
87                 + " failed. State: " + embeddedConnection.connectionState());
88         return embeddedConnection;
89     }
90 }