]> git.basschouten.com Git - openhab-addons.git/blob
816ec3b75880073c171f7ef50d23c076df6178bb
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.*;
18 import static org.mockito.ArgumentMatchers.*;
19 import static org.mockito.Mockito.*;
20 import static org.mockito.MockitoAnnotations.openMocks;
21
22 import java.nio.charset.StandardCharsets;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33
34 import org.eclipse.jdt.annotation.NonNullByDefault;
35 import org.junit.jupiter.api.AfterEach;
36 import org.junit.jupiter.api.BeforeEach;
37 import org.junit.jupiter.api.Test;
38 import org.mockito.Mock;
39 import org.mockito.invocation.InvocationOnMock;
40 import org.openhab.binding.mqtt.generic.ChannelState;
41 import org.openhab.binding.mqtt.generic.tools.ChildMap;
42 import org.openhab.binding.mqtt.generic.tools.WaitForTopicValue;
43 import org.openhab.binding.mqtt.homie.internal.handler.HomieThingHandler;
44 import org.openhab.binding.mqtt.homie.internal.homie300.Device;
45 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes;
46 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes.ReadyState;
47 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceCallback;
48 import org.openhab.binding.mqtt.homie.internal.homie300.Node;
49 import org.openhab.binding.mqtt.homie.internal.homie300.NodeAttributes;
50 import org.openhab.binding.mqtt.homie.internal.homie300.Property;
51 import org.openhab.binding.mqtt.homie.internal.homie300.PropertyAttributes;
52 import org.openhab.binding.mqtt.homie.internal.homie300.PropertyAttributes.DataTypeEnum;
53 import org.openhab.binding.mqtt.homie.internal.homie300.PropertyHelper;
54 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
55 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
56 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
57 import org.openhab.core.io.transport.mqtt.MqttService;
58 import org.openhab.core.library.types.DecimalType;
59 import org.openhab.core.library.types.OnOffType;
60 import org.openhab.core.test.java.JavaOSGiTest;
61 import org.openhab.core.types.UnDefType;
62 import org.osgi.service.cm.ConfigurationAdmin;
63
64 /**
65  * A full implementation test, that starts the embedded MQTT broker and publishes a homie device tree.
66  *
67  * @author David Graeff - Initial contribution
68  */
69 @NonNullByDefault
70 public class HomieImplementationTest extends JavaOSGiTest {
71     private static final String BASE_TOPIC = "homie";
72     private static final String DEVICE_ID = ThingChannelConstants.testHomieThing.getId();
73     private static final String DEVICE_TOPIC = BASE_TOPIC + "/" + DEVICE_ID;
74
75     private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
76     private @NonNullByDefault({}) MqttService mqttService;
77     private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
78     private @NonNullByDefault({}) MqttBrokerConnection connection;
79     private int registeredTopics = 100;
80
81     private @NonNullByDefault({}) AutoCloseable mocksCloseable;
82
83     // The handler is not tested here, so just mock the callback
84     private @Mock @NonNullByDefault({}) DeviceCallback callback;
85
86     // A handler mock is required to verify that channel value changes have been received
87     private @Mock @NonNullByDefault({}) HomieThingHandler handler;
88
89     private @NonNullByDefault({}) ScheduledExecutorService scheduler;
90
91     /**
92      * Create an observer that fails the test as soon as the broker client connection changes its connection state
93      * to something else then CONNECTED.
94      */
95     private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
96             is(MqttConnectionState.CONNECTED));
97
98     private String propertyTestTopic = "";
99
100     @BeforeEach
101     public void beforeEach() throws Exception {
102         registerVolatileStorageService();
103         mocksCloseable = openMocks(this);
104         configurationAdmin = getService(ConfigurationAdmin.class);
105         mqttService = getService(MqttService.class);
106
107         // Wait for the EmbeddedBrokerService internal connection to be connected
108         embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
109         embeddedConnection.setQos(1);
110
111         connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
112                 embeddedConnection.isSecure(), "homie");
113         connection.setQos(1);
114         connection.start().get(5, TimeUnit.SECONDS);
115         assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
116         // If the connection state changes in between -> fail
117         connection.addConnectionObserver(failIfChange);
118
119         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
120         futures.add(publish(DEVICE_TOPIC + "/$homie", "3.0"));
121         futures.add(publish(DEVICE_TOPIC + "/$name", "Name"));
122         futures.add(publish(DEVICE_TOPIC + "/$state", "ready"));
123         futures.add(publish(DEVICE_TOPIC + "/$nodes", "testnode"));
124
125         // Add homie node topics
126         final String testNode = DEVICE_TOPIC + "/testnode";
127         futures.add(publish(testNode + "/$name", "Testnode"));
128         futures.add(publish(testNode + "/$type", "Type"));
129         futures.add(publish(testNode + "/$properties", "temperature,doorbell,testRetain"));
130
131         // Add homie property topics
132         final String property = testNode + "/temperature";
133         futures.add(publish(property, "10"));
134         futures.add(publish(property + "/$name", "Testprop"));
135         futures.add(publish(property + "/$settable", "true"));
136         futures.add(publish(property + "/$unit", "°C"));
137         futures.add(publish(property + "/$datatype", "float"));
138         futures.add(publish(property + "/$format", "-100:100"));
139
140         final String propertyBellTopic = testNode + "/doorbell";
141         futures.add(publish(propertyBellTopic + "/$name", "Doorbell"));
142         futures.add(publish(propertyBellTopic + "/$settable", "false"));
143         futures.add(publish(propertyBellTopic + "/$retained", "false"));
144         futures.add(publish(propertyBellTopic + "/$datatype", "boolean"));
145
146         this.propertyTestTopic = testNode + "/testRetain";
147         futures.add(publish(propertyTestTopic + "/$name", "Test"));
148         futures.add(publish(propertyTestTopic + "/$settable", "true"));
149         futures.add(publish(propertyTestTopic + "/$retained", "false"));
150         futures.add(publish(propertyTestTopic + "/$datatype", "boolean"));
151
152         registeredTopics = futures.size();
153         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(2, TimeUnit.SECONDS);
154
155         scheduler = new ScheduledThreadPoolExecutor(6);
156     }
157
158     private CompletableFuture<Boolean> publish(String topic, String message) {
159         return embeddedConnection.publish(topic, message.getBytes(StandardCharsets.UTF_8), 0, true);
160     }
161
162     @AfterEach
163     public void afterEach() throws Exception {
164         if (connection != null) {
165             connection.removeConnectionObserver(failIfChange);
166             connection.stop().get(2, TimeUnit.SECONDS);
167         }
168         if (scheduler != null) {
169             scheduler.shutdownNow();
170         }
171         mocksCloseable.close();
172     }
173
174     @Test
175     public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
176         // four topics are not under /testnode !
177         CountDownLatch c = new CountDownLatch(registeredTopics - 4);
178         connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS);
179         assertTrue(c.await(5, TimeUnit.SECONDS),
180                 "Connection " + connection.getClientId() + " not retrieving all topics ");
181     }
182
183     @Test
184     public void retrieveOneAttribute() throws InterruptedException, ExecutionException {
185         WaitForTopicValue watcher = new WaitForTopicValue(connection, DEVICE_TOPIC + "/$homie");
186         assertThat(watcher.waitForTopicValue(1000), is("3.0"));
187     }
188
189     @SuppressWarnings("null")
190     @Test
191     public void retrieveAttributes() throws InterruptedException, ExecutionException {
192         assertThat(connection.hasSubscribers(), is(false));
193
194         Node node = new Node(DEVICE_TOPIC, "testnode", ThingChannelConstants.testHomieThing, callback,
195                 new NodeAttributes());
196         Property property = spy(
197                 new Property(DEVICE_TOPIC + "/testnode", node, "temperature", callback, new PropertyAttributes()));
198
199         // Create a scheduler
200         ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(4);
201
202         property.subscribe(connection, scheduler, 500).get();
203
204         assertThat(property.attributes.settable, is(true));
205         assertThat(property.attributes.retained, is(true));
206         assertThat(property.attributes.name, is("Testprop"));
207         assertThat(property.attributes.unit, is("°C"));
208         assertThat(property.attributes.datatype, is(DataTypeEnum.float_));
209         waitForAssert(() -> assertThat(property.attributes.format, is("-100:100")));
210         verify(property, timeout(500).atLeastOnce()).attributesReceived();
211
212         // Receive property value
213         ChannelState channelState = spy(property.getChannelState());
214         PropertyHelper.setChannelState(property, channelState);
215
216         property.startChannel(connection, scheduler, 500).get();
217         verify(channelState).start(any(), any(), anyInt());
218         verify(channelState, timeout(500)).processMessage(any(), any());
219         verify(callback).updateChannelState(any(), any());
220
221         assertThat(property.getChannelState().getCache().getChannelState(), is(new DecimalType(10)));
222
223         property.stop().get();
224         assertThat(connection.hasSubscribers(), is(false));
225     }
226
227     // Inject a spy'ed property
228     public Property createSpyProperty(InvocationOnMock invocation) {
229         final Node node = (Node) invocation.getMock();
230         final String id = (String) invocation.getArguments()[0];
231         return spy(node.createProperty(id, spy(new PropertyAttributes())));
232     }
233
234     // Inject a spy'ed node
235     public Node createSpyNode(InvocationOnMock invocation) {
236         final Device device = (Device) invocation.getMock();
237         final String id = (String) invocation.getArguments()[0];
238         // Create the node
239         Node node = spy(device.createNode(id, spy(new NodeAttributes())));
240         // Intercept creating a property in the next call and inject a spy'ed property.
241         doAnswer(this::createSpyProperty).when(node).createProperty(any());
242         return node;
243     }
244
245     @SuppressWarnings("null")
246     @Test
247     public void parseHomieTree() throws InterruptedException, ExecutionException, TimeoutException {
248         // Create a Homie Device object. Because spied Nodes are required for call verification,
249         // the full Device constructor need to be used and a ChildMap object need to be created manually.
250         ChildMap<Node> nodeMap = new ChildMap<>();
251         Device device = spy(
252                 new Device(ThingChannelConstants.testHomieThing, callback, new DeviceAttributes(), nodeMap));
253
254         // Intercept creating a node in initialize()->start() and inject a spy'ed node.
255         doAnswer(this::createSpyNode).when(device).createNode(any());
256
257         // initialize the device, subscribe and wait.
258         device.initialize(BASE_TOPIC, DEVICE_ID, Collections.emptyList());
259         device.subscribe(connection, scheduler, 1500).get();
260
261         assertThat(device.isInitialized(), is(true));
262
263         // Check device attributes
264         assertThat(device.attributes.homie, is("3.0"));
265         assertThat(device.attributes.name, is("Name"));
266         assertThat(device.attributes.state, is(ReadyState.ready));
267         assertThat(device.attributes.nodes.length, is(1));
268         verify(device, times(4)).attributeChanged(any(), any(), any(), any(), anyBoolean());
269         verify(callback).readyStateChanged(eq(ReadyState.ready));
270         verify(device).attributesReceived(any(), any(), anyInt());
271
272         // Expect 1 node
273         assertThat(device.nodes.size(), is(1));
274
275         // Check node and node attributes
276         Node node = device.nodes.get("testnode");
277         verify(node).subscribe(any(), any(), anyInt());
278         verify(node).attributesReceived(any(), any(), anyInt());
279         verify(node.attributes).subscribeAndReceive(any(), any(), anyString(), any(), anyInt());
280         assertThat(node.attributes.type, is("Type"));
281         assertThat(node.attributes.name, is("Testnode"));
282
283         // Expect 2 property
284         assertThat(node.properties.size(), is(3));
285
286         // Check property and property attributes
287         Property property = node.properties.get("temperature");
288         assertThat(property.attributes.settable, is(true));
289         assertThat(property.attributes.retained, is(true));
290         assertThat(property.attributes.name, is("Testprop"));
291         assertThat(property.attributes.unit, is("°C"));
292         assertThat(property.attributes.datatype, is(DataTypeEnum.float_));
293         assertThat(property.attributes.format, is("-100:100"));
294         verify(property).attributesReceived();
295         assertNotNull(property.getChannelState());
296         assertThat(property.getType().getState().getMinimum().intValue(), is(-100));
297         assertThat(property.getType().getState().getMaximum().intValue(), is(100));
298
299         // Check property and property attributes
300         Property propertyBell = node.properties.get("doorbell");
301         verify(propertyBell).attributesReceived();
302         assertThat(propertyBell.attributes.settable, is(false));
303         assertThat(propertyBell.attributes.retained, is(false));
304         assertThat(propertyBell.attributes.name, is("Doorbell"));
305         assertThat(propertyBell.attributes.datatype, is(DataTypeEnum.boolean_));
306
307         // The device->node->property tree is ready. Now subscribe to property values.
308         device.startChannels(connection, scheduler, 50, handler).get();
309         assertThat(propertyBell.getChannelState().isStateful(), is(false));
310         assertThat(propertyBell.getChannelState().getCache().getChannelState(), is(UnDefType.UNDEF));
311         assertThat(property.getChannelState().getCache().getChannelState(), is(new DecimalType(10)));
312
313         property = node.properties.get("testRetain");
314         WaitForTopicValue watcher = new WaitForTopicValue(embeddedConnection, propertyTestTopic + "/set");
315         // Watch the topic. Publish a retain=false value to MQTT
316         property.getChannelState().publishValue(OnOffType.OFF).get();
317         assertThat(watcher.waitForTopicValue(1000), is("false"));
318
319         // Publish a retain=false value to MQTT.
320         property.getChannelState().publishValue(OnOffType.ON).get();
321         // No value is expected to be retained on this MQTT topic
322         waitForAssert(() -> {
323             WaitForTopicValue w = new WaitForTopicValue(embeddedConnection, propertyTestTopic + "/set");
324             assertNull(w.waitForTopicValue(50));
325         }, 500, 100);
326     }
327 }