]> git.basschouten.com Git - openhab-addons.git/blob
0dfd246dc5b785f9638674cd8d404d43d95ee027
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.junit.Assert.*;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.*;
19 import static org.mockito.MockitoAnnotations.initMocks;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CompletableFuture;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.ScheduledThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34
35 import org.openhab.core.library.types.OnOffType;
36 import org.openhab.core.types.State;
37 import org.openhab.core.types.UnDefType;
38 import org.openhab.core.util.UIDUtils;
39 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
40 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
41 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
42 import org.openhab.core.io.transport.mqtt.MqttService;
43 import org.openhab.core.test.java.JavaOSGiTest;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.mockito.Mock;
48 import org.openhab.binding.mqtt.generic.AvailabilityTracker;
49 import org.openhab.binding.mqtt.generic.ChannelStateUpdateListener;
50 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
51 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
52 import org.openhab.binding.mqtt.homeassistant.internal.AbstractComponent;
53 import org.openhab.binding.mqtt.homeassistant.internal.ChannelConfigurationTypeAdapterFactory;
54 import org.openhab.binding.mqtt.homeassistant.internal.ComponentSwitch;
55 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents;
56 import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents.ComponentDiscovered;
57 import org.openhab.binding.mqtt.homeassistant.internal.HaID;
58
59 import com.google.gson.Gson;
60 import com.google.gson.GsonBuilder;
61
62 /**
63  * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
64  * tree.
65  *
66  * @author David Graeff - Initial contribution
67  */
68 public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
69     private MqttService mqttService;
70     private MqttBrokerConnection embeddedConnection;
71     private MqttBrokerConnection connection;
72     private int registeredTopics = 100;
73     private Throwable failure = null;
74
75     @Mock
76     ChannelStateUpdateListener channelStateUpdateListener;
77
78     @Mock
79     AvailabilityTracker availabilityTracker;
80
81     @Mock
82     TransformationServiceProvider transformationServiceProvider;
83
84     /**
85      * Create an observer that fails the test as soon as the broker client connection changes its connection state
86      * to something else then CONNECTED.
87      */
88     private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
89             is(MqttConnectionState.CONNECTED));
90     private String testObjectTopic;
91
92     @Before
93     public void setUp() throws InterruptedException, ExecutionException, TimeoutException, IOException {
94         registerVolatileStorageService();
95         initMocks(this);
96         mqttService = getService(MqttService.class);
97
98         // Wait for the EmbeddedBrokerService internal connection to be connected
99         embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
100
101         connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
102                 embeddedConnection.isSecure(), "ha_mqtt");
103         connection.start().get(1000, TimeUnit.MILLISECONDS);
104         assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
105
106         // If the connection state changes in between -> fail
107         connection.addConnectionObserver(failIfChange);
108
109         // Create topic string and config for one example HA component (a Switch)
110         testObjectTopic = "homeassistant/switch/node/" + ThingChannelConstants.testHomeAssistantThing.getId();
111         final String config = "{'name':'testname','state_topic':'" + testObjectTopic + "/state','command_topic':'"
112                 + testObjectTopic + "/set'}";
113
114         // Publish component configurations and component states to MQTT
115         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
116         futures.add(embeddedConnection.publish(testObjectTopic + "/config", config.getBytes()));
117         futures.add(embeddedConnection.publish(testObjectTopic + "/state", "true".getBytes()));
118
119         registeredTopics = futures.size();
120         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
121
122         failure = null;
123
124         doReturn(null).when(transformationServiceProvider).getTransformationService(any());
125     }
126
127     @After
128     public void tearDown() throws InterruptedException, ExecutionException, TimeoutException {
129         if (connection != null) {
130             connection.removeConnectionObserver(failIfChange);
131             connection.stop().get(1000, TimeUnit.MILLISECONDS);
132         }
133     }
134
135     @Test
136     public void reconnectTest() throws InterruptedException, ExecutionException, TimeoutException {
137         connection.removeConnectionObserver(failIfChange);
138         connection.stop().get(2000, TimeUnit.MILLISECONDS);
139         connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
140                 embeddedConnection.isSecure(), "ha_mqtt");
141         connection.start().get(2000, TimeUnit.MILLISECONDS);
142     }
143
144     @Test
145     public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
146         CountDownLatch c = new CountDownLatch(registeredTopics);
147         connection.subscribe("homeassistant/+/+/" + ThingChannelConstants.testHomeAssistantThing.getId() + "/#",
148                 (topic, payload) -> c.countDown()).get(1000, TimeUnit.MILLISECONDS);
149         assertTrue("Connection " + connection.getClientId() + " not retrieving all topics",
150                 c.await(1000, TimeUnit.MILLISECONDS));
151     }
152
153     @Test
154     public void parseHATree() throws InterruptedException, ExecutionException, TimeoutException {
155         MqttChannelTypeProvider channelTypeProvider = mock(MqttChannelTypeProvider.class);
156
157         final Map<String, AbstractComponent<?>> haComponents = new HashMap<>();
158         Gson gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory()).create();
159
160         ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(4);
161         DiscoverComponents discover = spy(new DiscoverComponents(ThingChannelConstants.testHomeAssistantThing,
162                 scheduler, channelStateUpdateListener, availabilityTracker, gson, transformationServiceProvider));
163
164         // The DiscoverComponents object calls ComponentDiscovered callbacks.
165         // In the following implementation we add the found component to the `haComponents` map
166         // and add the types to the channelTypeProvider, like in the real Thing handler.
167         final CountDownLatch latch = new CountDownLatch(1);
168         ComponentDiscovered cd = (haID, c) -> {
169             haComponents.put(c.uid().getId(), c);
170             c.addChannelTypes(channelTypeProvider);
171             channelTypeProvider.setChannelGroupType(c.groupTypeUID(), c.type());
172             latch.countDown();
173         };
174
175         // Start the discovery for 500ms. Forced timeout after 1500ms.
176         HaID haID = new HaID(testObjectTopic + "/config");
177         CompletableFuture<Void> future = discover.startDiscovery(connection, 1000, Collections.singleton(haID), cd)
178                 .thenRun(() -> {
179                 }).exceptionally(e -> {
180                     failure = e;
181                     return null;
182                 });
183
184         assertTrue(latch.await(1500, TimeUnit.MILLISECONDS));
185         future.get(800, TimeUnit.MILLISECONDS);
186
187         // No failure expected and one discovered result
188         assertNull(failure);
189         assertThat(haComponents.size(), is(1));
190
191         // For the switch component we should have one channel group type and one channel type
192         // setChannelGroupType is called once above
193         verify(channelTypeProvider, times(2)).setChannelGroupType(any(), any());
194         verify(channelTypeProvider, times(1)).setChannelType(any(), any());
195
196         String channelGroupId = UIDUtils
197                 .encode("node_" + ThingChannelConstants.testHomeAssistantThing.getId() + "_switch");
198
199         State value = haComponents.get(channelGroupId).channelTypes().get(ComponentSwitch.switchChannelID).getState()
200                 .getCache().getChannelState();
201         assertThat(value, is(UnDefType.UNDEF));
202
203         haComponents.values().stream().map(e -> e.start(connection, scheduler, 100))
204                 .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)).exceptionally(e -> {
205                     failure = e;
206                     return null;
207                 }).get();
208
209         // We should have received the retained value, while subscribing to the channels MQTT state topic.
210         verify(channelStateUpdateListener, timeout(1000).times(1)).updateChannelState(any(), any());
211
212         // Value should be ON now.
213         value = haComponents.get(channelGroupId).channelTypes().get(ComponentSwitch.switchChannelID).getState()
214                 .getCache().getChannelState();
215         assertThat(value, is(OnOffType.ON));
216     }
217 }