]> git.basschouten.com Git - openhab-addons.git/blob
24dc2be667e3c3fa6ed566e7a081d2d52055c651
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.junit.Assert.*;
17 import static org.mockito.ArgumentMatchers.*;
18 import static org.mockito.Mockito.*;
19 import static org.mockito.MockitoAnnotations.initMocks;
20
21 import java.nio.charset.StandardCharsets;
22 import java.nio.file.Paths;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33
34 import org.openhab.core.library.types.DecimalType;
35 import org.openhab.core.library.types.OnOffType;
36 import org.openhab.core.types.UnDefType;
37 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
38 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
39 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
40 import org.openhab.core.io.transport.mqtt.MqttService;
41 import org.openhab.core.test.java.JavaOSGiTest;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.mockito.Mock;
46 import org.mockito.invocation.InvocationOnMock;
47 import org.openhab.binding.mqtt.generic.ChannelState;
48 import org.openhab.binding.mqtt.generic.tools.ChildMap;
49 import org.openhab.binding.mqtt.generic.tools.WaitForTopicValue;
50 import org.openhab.binding.mqtt.homie.internal.handler.HomieThingHandler;
51 import org.openhab.binding.mqtt.homie.internal.homie300.Device;
52 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes;
53 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes.ReadyState;
54 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceCallback;
55 import org.openhab.binding.mqtt.homie.internal.homie300.Node;
56 import org.openhab.binding.mqtt.homie.internal.homie300.NodeAttributes;
57 import org.openhab.binding.mqtt.homie.internal.homie300.Property;
58 import org.openhab.binding.mqtt.homie.internal.homie300.PropertyAttributes;
59 import org.openhab.binding.mqtt.homie.internal.homie300.PropertyAttributes.DataTypeEnum;
60 import org.openhab.binding.mqtt.homie.internal.homie300.PropertyHelper;
61
62 /**
63  * A full implementation test, that starts the embedded MQTT broker and publishes a homie device tree.
64  *
65  * @author David Graeff - Initial contribution
66  */
67 public class HomieImplementationTest extends JavaOSGiTest {
68     private static final String BASE_TOPIC = "homie";
69     private static final String DEVICE_ID = ThingChannelConstants.testHomieThing.getId();
70     private static final String DEVICE_TOPIC = BASE_TOPIC + "/" + DEVICE_ID;
71
72     private MqttService mqttService;
73     private MqttBrokerConnection embeddedConnection;
74     private MqttBrokerConnection connection;
75     private int registeredTopics = 100;
76
77     // The handler is not tested here, so just mock the callback
78     @Mock
79     DeviceCallback callback;
80
81     // A handler mock is required to verify that channel value changes have been received
82     @Mock
83     HomieThingHandler handler;
84
85     private ScheduledExecutorService scheduler;
86
87     /**
88      * Create an observer that fails the test as soon as the broker client connection changes its connection state
89      * to something else then CONNECTED.
90      */
91     private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
92             is(MqttConnectionState.CONNECTED));
93
94     private String propertyTestTopic;
95
96     @Before
97     public void setUp() throws InterruptedException, ExecutionException, TimeoutException {
98         registerVolatileStorageService();
99         initMocks(this);
100         mqttService = getService(MqttService.class);
101
102         embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
103         embeddedConnection.setQos(1);
104         embeddedConnection.setRetain(true);
105
106         connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
107                 embeddedConnection.isSecure(), "homie");
108         connection.setQos(1);
109         connection.setPersistencePath(Paths.get("subconn"));
110         connection.start().get(500, TimeUnit.MILLISECONDS);
111         assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
112         // If the connection state changes in between -> fail
113         connection.addConnectionObserver(failIfChange);
114
115         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
116         futures.add(embeddedConnection.publish(DEVICE_TOPIC + "/$homie", "3.0".getBytes()));
117         futures.add(embeddedConnection.publish(DEVICE_TOPIC + "/$name", "Name".getBytes()));
118         futures.add(embeddedConnection.publish(DEVICE_TOPIC + "/$state", "ready".getBytes()));
119         futures.add(embeddedConnection.publish(DEVICE_TOPIC + "/$nodes", "testnode".getBytes()));
120
121         // Add homie node topics
122         final String testNode = DEVICE_TOPIC + "/testnode";
123         futures.add(embeddedConnection.publish(testNode + "/$name", "Testnode".getBytes()));
124         futures.add(embeddedConnection.publish(testNode + "/$type", "Type".getBytes()));
125         futures.add(
126                 embeddedConnection.publish(testNode + "/$properties", "temperature,doorbell,testRetain".getBytes()));
127
128         // Add homie property topics
129         final String property = testNode + "/temperature";
130         futures.add(embeddedConnection.publish(property, "10".getBytes()));
131         futures.add(embeddedConnection.publish(property + "/$name", "Testprop".getBytes()));
132         futures.add(embeddedConnection.publish(property + "/$settable", "true".getBytes()));
133         futures.add(embeddedConnection.publish(property + "/$unit", "°C".getBytes(StandardCharsets.UTF_8)));
134         futures.add(embeddedConnection.publish(property + "/$datatype", "float".getBytes()));
135         futures.add(embeddedConnection.publish(property + "/$format", "-100:100".getBytes()));
136
137         final String propertyBellTopic = testNode + "/doorbell";
138         futures.add(embeddedConnection.publish(propertyBellTopic + "/$name", "Doorbell".getBytes()));
139         futures.add(embeddedConnection.publish(propertyBellTopic + "/$settable", "false".getBytes()));
140         futures.add(embeddedConnection.publish(propertyBellTopic + "/$retained", "false".getBytes()));
141         futures.add(embeddedConnection.publish(propertyBellTopic + "/$datatype", "boolean".getBytes()));
142
143         this.propertyTestTopic = testNode + "/testRetain";
144         futures.add(embeddedConnection.publish(propertyTestTopic + "/$name", "Test".getBytes()));
145         futures.add(embeddedConnection.publish(propertyTestTopic + "/$settable", "true".getBytes()));
146         futures.add(embeddedConnection.publish(propertyTestTopic + "/$retained", "false".getBytes()));
147         futures.add(embeddedConnection.publish(propertyTestTopic + "/$datatype", "boolean".getBytes()));
148
149         registeredTopics = futures.size();
150         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
151
152         scheduler = new ScheduledThreadPoolExecutor(6);
153     }
154
155     @After
156     public void tearDown() throws InterruptedException, ExecutionException, TimeoutException {
157         if (connection != null) {
158             connection.removeConnectionObserver(failIfChange);
159             connection.stop().get(500, TimeUnit.MILLISECONDS);
160         }
161         scheduler.shutdownNow();
162     }
163
164     @Test
165     public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
166         // four topics are not under /testnode !
167         CountDownLatch c = new CountDownLatch(registeredTopics - 4);
168         connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5000,
169                 TimeUnit.MILLISECONDS);
170         assertTrue("Connection " + connection.getClientId() + " not retrieving all topics ",
171                 c.await(5000, TimeUnit.MILLISECONDS));
172     }
173
174     @Test
175     public void retrieveOneAttribute() throws InterruptedException, ExecutionException {
176         WaitForTopicValue watcher = new WaitForTopicValue(connection, DEVICE_TOPIC + "/$homie");
177         assertThat(watcher.waitForTopicValue(1000), is("3.0"));
178     }
179
180     @SuppressWarnings("null")
181     @Test
182     public void retrieveAttributes() throws InterruptedException, ExecutionException {
183         assertThat(connection.hasSubscribers(), is(false));
184
185         Node node = new Node(DEVICE_TOPIC, "testnode", ThingChannelConstants.testHomieThing, callback,
186                 new NodeAttributes());
187         Property property = spy(
188                 new Property(DEVICE_TOPIC + "/testnode", node, "temperature", callback, new PropertyAttributes()));
189
190         // Create a scheduler
191         ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(4);
192
193         property.subscribe(connection, scheduler, 500).get();
194
195         assertThat(property.attributes.settable, is(true));
196         assertThat(property.attributes.retained, is(true));
197         assertThat(property.attributes.name, is("Testprop"));
198         assertThat(property.attributes.unit, is("°C"));
199         assertThat(property.attributes.datatype, is(DataTypeEnum.float_));
200         waitForAssert(() -> assertThat(property.attributes.format, is("-100:100")));
201         verify(property, timeout(500).atLeastOnce()).attributesReceived();
202
203         // Receive property value
204         ChannelState channelState = spy(property.getChannelState());
205         PropertyHelper.setChannelState(property, channelState);
206
207         property.startChannel(connection, scheduler, 500).get();
208         verify(channelState).start(any(), any(), anyInt());
209         verify(channelState, timeout(500)).processMessage(any(), any());
210         verify(callback).updateChannelState(any(), any());
211
212         assertThat(property.getChannelState().getCache().getChannelState(), is(new DecimalType(10)));
213
214         property.stop().get();
215         assertThat(connection.hasSubscribers(), is(false));
216     }
217
218     // Inject a spy'ed property
219     public Property createSpyProperty(InvocationOnMock invocation) {
220         final Node node = (Node) invocation.getMock();
221         final String id = (String) invocation.getArguments()[0];
222         return spy(node.createProperty(id, spy(new PropertyAttributes())));
223     }
224
225     // Inject a spy'ed node
226     public Node createSpyNode(InvocationOnMock invocation) {
227         final Device device = (Device) invocation.getMock();
228         final String id = (String) invocation.getArguments()[0];
229         // Create the node
230         Node node = spy(device.createNode(id, spy(new NodeAttributes())));
231         // Intercept creating a property in the next call and inject a spy'ed property.
232         doAnswer(this::createSpyProperty).when(node).createProperty(any());
233         return node;
234     }
235
236     @SuppressWarnings("null")
237     @Test
238     public void parseHomieTree() throws InterruptedException, ExecutionException, TimeoutException {
239         // Create a Homie Device object. Because spied Nodes are required for call verification,
240         // the full Device constructor need to be used and a ChildMap object need to be created manually.
241         ChildMap<Node> nodeMap = new ChildMap<>();
242         Device device = spy(
243                 new Device(ThingChannelConstants.testHomieThing, callback, new DeviceAttributes(), nodeMap));
244
245         // Intercept creating a node in initialize()->start() and inject a spy'ed node.
246         doAnswer(this::createSpyNode).when(device).createNode(any());
247
248         // initialize the device, subscribe and wait.
249         device.initialize(BASE_TOPIC, DEVICE_ID, Collections.emptyList());
250         device.subscribe(connection, scheduler, 1500).get();
251
252         assertThat(device.isInitialized(), is(true));
253
254         // Check device attributes
255         assertThat(device.attributes.homie, is("3.0"));
256         assertThat(device.attributes.name, is("Name"));
257         assertThat(device.attributes.state, is(ReadyState.ready));
258         assertThat(device.attributes.nodes.length, is(1));
259         verify(device, times(4)).attributeChanged(any(), any(), any(), any(), anyBoolean());
260         verify(callback).readyStateChanged(eq(ReadyState.ready));
261         verify(device).attributesReceived(any(), any(), anyInt());
262
263         // Expect 1 node
264         assertThat(device.nodes.size(), is(1));
265
266         // Check node and node attributes
267         Node node = device.nodes.get("testnode");
268         verify(node).subscribe(any(), any(), anyInt());
269         verify(node).attributesReceived(any(), any(), anyInt());
270         verify(node.attributes).subscribeAndReceive(any(), any(), anyString(), any(), anyInt());
271         assertThat(node.attributes.type, is("Type"));
272         assertThat(node.attributes.name, is("Testnode"));
273
274         // Expect 2 property
275         assertThat(node.properties.size(), is(3));
276
277         // Check property and property attributes
278         Property property = node.properties.get("temperature");
279         assertThat(property.attributes.settable, is(true));
280         assertThat(property.attributes.retained, is(true));
281         assertThat(property.attributes.name, is("Testprop"));
282         assertThat(property.attributes.unit, is("°C"));
283         assertThat(property.attributes.datatype, is(DataTypeEnum.float_));
284         assertThat(property.attributes.format, is("-100:100"));
285         verify(property).attributesReceived();
286         assertNotNull(property.getChannelState());
287         assertThat(property.getType().getState().getMinimum().intValue(), is(-100));
288         assertThat(property.getType().getState().getMaximum().intValue(), is(100));
289
290         // Check property and property attributes
291         Property propertyBell = node.properties.get("doorbell");
292         verify(propertyBell).attributesReceived();
293         assertThat(propertyBell.attributes.settable, is(false));
294         assertThat(propertyBell.attributes.retained, is(false));
295         assertThat(propertyBell.attributes.name, is("Doorbell"));
296         assertThat(propertyBell.attributes.datatype, is(DataTypeEnum.boolean_));
297
298         // The device->node->property tree is ready. Now subscribe to property values.
299         device.startChannels(connection, scheduler, 50, handler).get();
300         assertThat(propertyBell.getChannelState().isStateful(), is(false));
301         assertThat(propertyBell.getChannelState().getCache().getChannelState(), is(UnDefType.UNDEF));
302         assertThat(property.getChannelState().getCache().getChannelState(), is(new DecimalType(10)));
303
304         property = node.properties.get("testRetain");
305         WaitForTopicValue watcher = new WaitForTopicValue(embeddedConnection, propertyTestTopic + "/set");
306         // Watch the topic. Publish a retain=false value to MQTT
307         property.getChannelState().publishValue(OnOffType.OFF).get();
308         assertThat(watcher.waitForTopicValue(1000), is("false"));
309
310         // Publish a retain=false value to MQTT.
311         property.getChannelState().publishValue(OnOffType.ON).get();
312         // No value is expected to be retained on this MQTT topic
313         waitForAssert(() -> {
314             try {
315                 WaitForTopicValue w = new WaitForTopicValue(embeddedConnection, propertyTestTopic + "/set");
316                 assertNull(w.waitForTopicValue(50));
317             } catch (InterruptedException | ExecutionException e) {
318             }
319         }, 500, 100);
320     }
321 }