2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt;
15 import static org.junit.Assert.assertTrue;
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.TimeUnit;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
24 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
25 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
26 import org.openhab.core.io.transport.mqtt.MqttService;
27 import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
28 import org.openhab.io.mqttembeddedbroker.Constants;
31 * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
34 * @author David Graeff - Initial contribution
37 public class EmbeddedBrokerTools {
38 public @Nullable MqttBrokerConnection embeddedConnection = null;
41 * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
43 * @throws InterruptedException
45 public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException {
46 embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
47 if (embeddedConnection == null) {
48 Semaphore semaphore = new Semaphore(1);
50 MqttServiceObserver observer = new MqttServiceObserver() {
53 public void brokerAdded(@NonNull String brokerID, @NonNull MqttBrokerConnection broker) {
54 if (brokerID.equals(Constants.CLIENTID)) {
55 embeddedConnection = broker;
61 public void brokerRemoved(@NonNull String brokerID, @NonNull MqttBrokerConnection broker) {
65 mqttService.addBrokersListener(observer);
66 assertTrue("Wait for embedded connection client failed", semaphore.tryAcquire(1000, TimeUnit.MILLISECONDS));
68 MqttBrokerConnection embeddedConnection = this.embeddedConnection;
69 if (embeddedConnection == null) {
70 throw new IllegalStateException();
73 Semaphore semaphore = new Semaphore(1);
75 MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
76 if (state == MqttConnectionState.CONNECTED) {
80 embeddedConnection.addConnectionObserver(mqttConnectionObserver);
81 if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
84 assertTrue("Connection " + embeddedConnection.getClientId() + " failed. State: "
85 + embeddedConnection.connectionState(), semaphore.tryAcquire(500, TimeUnit.MILLISECONDS));
86 return embeddedConnection;