]> git.basschouten.com Git - openhab-addons.git/blob
647b3a31c5b0df03805c1c731bc288ad3d851cfa
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt;
14
15 import static org.junit.jupiter.api.Assertions.assertTrue;
16
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.TimeUnit;
19
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
24 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
25 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
26 import org.openhab.core.io.transport.mqtt.MqttService;
27 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
28 import org.openhab.io.mqttembeddedbroker.Constants;
29
30 /**
31  * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
32  * tree.
33  *
34  * @author David Graeff - Initial contribution
35  */
36 @NonNullByDefault
37 public class EmbeddedBrokerTools {
38     public @Nullable MqttBrokerConnection embeddedConnection = null;
39
40     /**
41      * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
42      *
43      * @throws InterruptedException
44      */
45     public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException {
46         embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
47         if (embeddedConnection == null) {
48             Semaphore semaphore = new Semaphore(1);
49             semaphore.acquire();
50             MqttServiceObserver observer = new MqttServiceObserver() {
51
52                 @Override
53                 public void brokerAdded(@NonNull String brokerID, @NonNull MqttBrokerConnection broker) {
54                     if (brokerID.equals(Constants.CLIENTID)) {
55                         embeddedConnection = broker;
56                         semaphore.release();
57                     }
58                 }
59
60                 @Override
61                 public void brokerRemoved(@NonNull String brokerID, @NonNull MqttBrokerConnection broker) {
62                 }
63
64             };
65             mqttService.addBrokersListener(observer);
66             assertTrue(semaphore.tryAcquire(1000, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed");
67         }
68         MqttBrokerConnection embeddedConnection = this.embeddedConnection;
69         if (embeddedConnection == null) {
70             throw new IllegalStateException();
71         }
72
73         Semaphore semaphore = new Semaphore(1);
74         semaphore.acquire();
75         MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
76             if (state == MqttConnectionState.CONNECTED) {
77                 semaphore.release();
78             }
79         };
80         embeddedConnection.addConnectionObserver(mqttConnectionObserver);
81         if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
82             semaphore.release();
83         }
84         assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId()
85                 + " failed. State: " + embeddedConnection.connectionState());
86         return embeddedConnection;
87     }
88
89 }