]> git.basschouten.com Git - openhab-addons.git/blob
c318ed535d17c2f64809e09ba37c53e550ba49f5
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt;
14
15 import static org.junit.jupiter.api.Assertions.assertTrue;
16
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.TimeUnit;
19
20 import org.eclipse.jdt.annotation.NonNullByDefault;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
23 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
24 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
25 import org.openhab.core.io.transport.mqtt.MqttService;
26 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
27 import org.openhab.io.mqttembeddedbroker.Constants;
28
29 /**
30  * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
31  * tree.
32  *
33  * @author David Graeff - Initial contribution
34  */
35 @NonNullByDefault
36 public class EmbeddedBrokerTools {
37     public @Nullable MqttBrokerConnection embeddedConnection = null;
38
39     /**
40      * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
41      *
42      * @throws InterruptedException
43      */
44     public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException {
45         embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
46         if (embeddedConnection == null) {
47             Semaphore semaphore = new Semaphore(1);
48             semaphore.acquire();
49             MqttServiceObserver observer = new MqttServiceObserver() {
50
51                 @Override
52                 public void brokerAdded(String brokerID, MqttBrokerConnection broker) {
53                     if (brokerID.equals(Constants.CLIENTID)) {
54                         embeddedConnection = broker;
55                         semaphore.release();
56                     }
57                 }
58
59                 @Override
60                 public void brokerRemoved(String brokerID, MqttBrokerConnection broker) {
61                 }
62             };
63             mqttService.addBrokersListener(observer);
64             assertTrue(semaphore.tryAcquire(1000, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed");
65         }
66         MqttBrokerConnection embeddedConnection = this.embeddedConnection;
67         if (embeddedConnection == null) {
68             throw new IllegalStateException();
69         }
70
71         Semaphore semaphore = new Semaphore(1);
72         semaphore.acquire();
73         MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
74             if (state == MqttConnectionState.CONNECTED) {
75                 semaphore.release();
76             }
77         };
78         embeddedConnection.addConnectionObserver(mqttConnectionObserver);
79         if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
80             semaphore.release();
81         }
82         assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId()
83                 + " failed. State: " + embeddedConnection.connectionState());
84         return embeddedConnection;
85     }
86 }