2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt;
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.junit.Assert.*;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.*;
19 import static org.mockito.MockitoAnnotations.initMocks;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
27 import java.util.concurrent.CompletableFuture;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.ScheduledThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
35 import org.openhab.core.library.types.OnOffType;
36 import org.openhab.core.types.State;
37 import org.openhab.core.types.UnDefType;
38 import org.openhab.core.util.UIDUtils;
39 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
40 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
41 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
42 import org.openhab.core.io.transport.mqtt.MqttService;
43 import org.openhab.core.test.java.JavaOSGiTest;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.mockito.Mock;
48 import org.openhab.binding.mqtt.generic.AvailabilityTracker;
49 import org.openhab.binding.mqtt.generic.ChannelStateUpdateListener;
50 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
51 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
52 import org.openhab.binding.mqtt.homeassistant.internal.AbstractComponent;
53 import org.openhab.binding.mqtt.homeassistant.internal.ChannelConfigurationTypeAdapterFactory;
54 import org.openhab.binding.mqtt.homeassistant.internal.ComponentSwitch;
55 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents;
56 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents.ComponentDiscovered;
57 import org.openhab.binding.mqtt.homeassistant.internal.HaID;
59 import com.google.gson.Gson;
60 import com.google.gson.GsonBuilder;
63 * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
66 * @author David Graeff - Initial contribution
68 public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
69 private MqttService mqttService;
70 private MqttBrokerConnection embeddedConnection;
71 private MqttBrokerConnection connection;
72 private int registeredTopics = 100;
73 private Throwable failure = null;
76 ChannelStateUpdateListener channelStateUpdateListener;
79 AvailabilityTracker availabilityTracker;
82 TransformationServiceProvider transformationServiceProvider;
85 * Create an observer that fails the test as soon as the broker client connection changes its connection state
86 * to something else then CONNECTED.
88 private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
89 is(MqttConnectionState.CONNECTED));
90 private String testObjectTopic;
93 public void setUp() throws InterruptedException, ExecutionException, TimeoutException, IOException {
94 registerVolatileStorageService();
96 mqttService = getService(MqttService.class);
98 // Wait for the EmbeddedBrokerService internal connection to be connected
99 embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
101 connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
102 embeddedConnection.isSecure(), "ha_mqtt");
103 connection.start().get(1000, TimeUnit.MILLISECONDS);
104 assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
106 // If the connection state changes in between -> fail
107 connection.addConnectionObserver(failIfChange);
109 // Create topic string and config for one example HA component (a Switch)
110 testObjectTopic = "homeassistant/switch/node/" + ThingChannelConstants.testHomeAssistantThing.getId();
111 final String config = "{'name':'testname','state_topic':'" + testObjectTopic + "/state','command_topic':'"
112 + testObjectTopic + "/set'}";
114 // Publish component configurations and component states to MQTT
115 List<CompletableFuture<Boolean>> futures = new ArrayList<>();
116 futures.add(embeddedConnection.publish(testObjectTopic + "/config", config.getBytes()));
117 futures.add(embeddedConnection.publish(testObjectTopic + "/state", "true".getBytes()));
119 registeredTopics = futures.size();
120 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
124 doReturn(null).when(transformationServiceProvider).getTransformationService(any());
128 public void tearDown() throws InterruptedException, ExecutionException, TimeoutException {
129 if (connection != null) {
130 connection.removeConnectionObserver(failIfChange);
131 connection.stop().get(1000, TimeUnit.MILLISECONDS);
136 public void reconnectTest() throws InterruptedException, ExecutionException, TimeoutException {
137 connection.removeConnectionObserver(failIfChange);
138 connection.stop().get(2000, TimeUnit.MILLISECONDS);
139 connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
140 embeddedConnection.isSecure(), "ha_mqtt");
141 connection.start().get(2000, TimeUnit.MILLISECONDS);
145 public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
146 CountDownLatch c = new CountDownLatch(registeredTopics);
147 connection.subscribe("homeassistant/+/+/" + ThingChannelConstants.testHomeAssistantThing.getId() + "/#",
148 (topic, payload) -> c.countDown()).get(1000, TimeUnit.MILLISECONDS);
149 assertTrue("Connection " + connection.getClientId() + " not retrieving all topics",
150 c.await(1000, TimeUnit.MILLISECONDS));
154 public void parseHATree() throws InterruptedException, ExecutionException, TimeoutException {
155 MqttChannelTypeProvider channelTypeProvider = mock(MqttChannelTypeProvider.class);
157 final Map<String, AbstractComponent<?>> haComponents = new HashMap<>();
158 Gson gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory()).create();
160 ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(4);
161 DiscoverComponents discover = spy(new DiscoverComponents(ThingChannelConstants.testHomeAssistantThing,
162 scheduler, channelStateUpdateListener, availabilityTracker, gson, transformationServiceProvider));
164 // The DiscoverComponents object calls ComponentDiscovered callbacks.
165 // In the following implementation we add the found component to the `haComponents` map
166 // and add the types to the channelTypeProvider, like in the real Thing handler.
167 final CountDownLatch latch = new CountDownLatch(1);
168 ComponentDiscovered cd = (haID, c) -> {
169 haComponents.put(c.uid().getId(), c);
170 c.addChannelTypes(channelTypeProvider);
171 channelTypeProvider.setChannelGroupType(c.groupTypeUID(), c.type());
175 // Start the discovery for 500ms. Forced timeout after 1500ms.
176 HaID haID = new HaID(testObjectTopic + "/config");
177 CompletableFuture<Void> future = discover.startDiscovery(connection, 1000, Collections.singleton(haID), cd)
179 }).exceptionally(e -> {
184 assertTrue(latch.await(1500, TimeUnit.MILLISECONDS));
185 future.get(800, TimeUnit.MILLISECONDS);
187 // No failure expected and one discovered result
189 assertThat(haComponents.size(), is(1));
191 // For the switch component we should have one channel group type and one channel type
192 // setChannelGroupType is called once above
193 verify(channelTypeProvider, times(2)).setChannelGroupType(any(), any());
194 verify(channelTypeProvider, times(1)).setChannelType(any(), any());
196 String channelGroupId = UIDUtils
197 .encode("node_" + ThingChannelConstants.testHomeAssistantThing.getId() + "_switch");
199 State value = haComponents.get(channelGroupId).channelTypes().get(ComponentSwitch.switchChannelID).getState()
200 .getCache().getChannelState();
201 assertThat(value, is(UnDefType.UNDEF));
203 haComponents.values().stream().map(e -> e.start(connection, scheduler, 100))
204 .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)).exceptionally(e -> {
209 // We should have received the retained value, while subscribing to the channels MQTT state topic.
210 verify(channelStateUpdateListener, timeout(1000).times(1)).updateChannelState(any(), any());
212 // Value should be ON now.
213 value = haComponents.get(channelGroupId).channelTypes().get(ComponentSwitch.switchChannelID).getState()
214 .getCache().getChannelState();
215 assertThat(value, is(OnOffType.ON));