2 * Copyright (c) 2010-2022 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt;
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.*;
18 import static org.mockito.ArgumentMatchers.*;
19 import static org.mockito.Mockito.*;
21 import java.nio.charset.StandardCharsets;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.ScheduledThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
33 import org.eclipse.jdt.annotation.NonNullByDefault;
34 import org.junit.jupiter.api.AfterEach;
35 import org.junit.jupiter.api.BeforeEach;
36 import org.junit.jupiter.api.Test;
37 import org.junit.jupiter.api.extension.ExtendWith;
38 import org.mockito.Mock;
39 import org.mockito.invocation.InvocationOnMock;
40 import org.mockito.junit.jupiter.MockitoExtension;
41 import org.mockito.junit.jupiter.MockitoSettings;
42 import org.mockito.quality.Strictness;
43 import org.openhab.binding.mqtt.generic.ChannelState;
44 import org.openhab.binding.mqtt.generic.tools.ChildMap;
45 import org.openhab.binding.mqtt.generic.tools.WaitForTopicValue;
46 import org.openhab.binding.mqtt.homie.internal.handler.HomieThingHandler;
47 import org.openhab.binding.mqtt.homie.internal.homie300.Device;
48 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes;
49 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes.ReadyState;
50 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceCallback;
51 import org.openhab.binding.mqtt.homie.internal.homie300.Node;
52 import org.openhab.binding.mqtt.homie.internal.homie300.NodeAttributes;
53 import org.openhab.binding.mqtt.homie.internal.homie300.Property;
54 import org.openhab.binding.mqtt.homie.internal.homie300.PropertyAttributes;
55 import org.openhab.binding.mqtt.homie.internal.homie300.PropertyAttributes.DataTypeEnum;
56 import org.openhab.binding.mqtt.homie.internal.homie300.PropertyHelper;
57 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
58 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
59 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
60 import org.openhab.core.io.transport.mqtt.MqttService;
61 import org.openhab.core.library.types.DecimalType;
62 import org.openhab.core.library.types.OnOffType;
63 import org.openhab.core.test.java.JavaOSGiTest;
64 import org.openhab.core.types.UnDefType;
65 import org.osgi.service.cm.ConfigurationAdmin;
68 * A full implementation test, that starts the embedded MQTT broker and publishes a homie device tree.
70 * @author David Graeff - Initial contribution
72 @ExtendWith(MockitoExtension.class)
73 @MockitoSettings(strictness = Strictness.LENIENT)
75 public class HomieImplementationTest extends JavaOSGiTest {
76 private static final String BASE_TOPIC = "homie";
77 private static final String DEVICE_ID = ThingChannelConstants.testHomieThing.getId();
78 private static final String DEVICE_TOPIC = BASE_TOPIC + "/" + DEVICE_ID;
80 private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
81 private @NonNullByDefault({}) MqttService mqttService;
82 private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
83 private @NonNullByDefault({}) MqttBrokerConnection connection;
84 private int registeredTopics = 100;
86 // The handler is not tested here, so just mock the callback
87 private @Mock @NonNullByDefault({}) DeviceCallback callback;
89 // A handler mock is required to verify that channel value changes have been received
90 private @Mock @NonNullByDefault({}) HomieThingHandler handler;
92 private @NonNullByDefault({}) ScheduledExecutorService scheduler;
95 * Create an observer that fails the test as soon as the broker client connection changes its connection state
96 * to something else then CONNECTED.
98 private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
99 is(MqttConnectionState.CONNECTED));
101 private String propertyTestTopic = "";
104 public void beforeEach() throws Exception {
105 registerVolatileStorageService();
106 configurationAdmin = getService(ConfigurationAdmin.class);
107 mqttService = getService(MqttService.class);
109 // Wait for the EmbeddedBrokerService internal connection to be connected
110 embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
111 embeddedConnection.setQos(1);
113 connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
114 embeddedConnection.isSecure(), "homie");
115 connection.setQos(1);
116 connection.start().get(5, TimeUnit.SECONDS);
117 assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
118 // If the connection state changes in between -> fail
119 connection.addConnectionObserver(failIfChange);
121 List<CompletableFuture<Boolean>> futures = new ArrayList<>();
122 futures.add(publish(DEVICE_TOPIC + "/$homie", "3.0"));
123 futures.add(publish(DEVICE_TOPIC + "/$name", "Name"));
124 futures.add(publish(DEVICE_TOPIC + "/$state", "ready"));
125 futures.add(publish(DEVICE_TOPIC + "/$nodes", "testnode"));
127 // Add homie node topics
128 final String testNode = DEVICE_TOPIC + "/testnode";
129 futures.add(publish(testNode + "/$name", "Testnode"));
130 futures.add(publish(testNode + "/$type", "Type"));
131 futures.add(publish(testNode + "/$properties", "temperature,doorbell,testRetain"));
133 // Add homie property topics
134 final String property = testNode + "/temperature";
135 futures.add(publish(property, "10"));
136 futures.add(publish(property + "/$name", "Testprop"));
137 futures.add(publish(property + "/$settable", "true"));
138 futures.add(publish(property + "/$unit", "°C"));
139 futures.add(publish(property + "/$datatype", "float"));
140 futures.add(publish(property + "/$format", "-100:100"));
142 final String propertyBellTopic = testNode + "/doorbell";
143 futures.add(publish(propertyBellTopic + "/$name", "Doorbell"));
144 futures.add(publish(propertyBellTopic + "/$settable", "false"));
145 futures.add(publish(propertyBellTopic + "/$retained", "false"));
146 futures.add(publish(propertyBellTopic + "/$datatype", "boolean"));
148 this.propertyTestTopic = testNode + "/testRetain";
149 futures.add(publish(propertyTestTopic + "/$name", "Test"));
150 futures.add(publish(propertyTestTopic + "/$settable", "true"));
151 futures.add(publish(propertyTestTopic + "/$retained", "false"));
152 futures.add(publish(propertyTestTopic + "/$datatype", "boolean"));
154 registeredTopics = futures.size();
155 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(2, TimeUnit.SECONDS);
157 scheduler = new ScheduledThreadPoolExecutor(6);
160 private CompletableFuture<Boolean> publish(String topic, String message) {
161 return embeddedConnection.publish(topic, message.getBytes(StandardCharsets.UTF_8), 0, true);
165 public void afterEach() throws Exception {
166 if (connection != null) {
167 connection.removeConnectionObserver(failIfChange);
168 connection.stop().get(2, TimeUnit.SECONDS);
170 if (scheduler != null) {
171 scheduler.shutdownNow();
176 public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
177 // four topics are not under /testnode !
178 CountDownLatch c = new CountDownLatch(registeredTopics - 4);
179 connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS);
180 assertTrue(c.await(5, TimeUnit.SECONDS),
181 "Connection " + connection.getClientId() + " not retrieving all topics ");
185 public void retrieveOneAttribute() throws InterruptedException, ExecutionException {
186 WaitForTopicValue watcher = new WaitForTopicValue(connection, DEVICE_TOPIC + "/$homie");
187 assertThat(watcher.waitForTopicValue(1000), is("3.0"));
190 @SuppressWarnings("null")
192 public void retrieveAttributes() throws InterruptedException, ExecutionException {
193 assertThat(connection.hasSubscribers(), is(false));
195 Node node = new Node(DEVICE_TOPIC, "testnode", ThingChannelConstants.testHomieThing, callback,
196 new NodeAttributes());
197 Property property = spy(
198 new Property(DEVICE_TOPIC + "/testnode", node, "temperature", callback, new PropertyAttributes()));
200 // Create a scheduler
201 ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(4);
203 property.subscribe(connection, scheduler, 500).get();
205 assertThat(property.attributes.settable, is(true));
206 assertThat(property.attributes.retained, is(true));
207 assertThat(property.attributes.name, is("Testprop"));
208 assertThat(property.attributes.unit, is("°C"));
209 assertThat(property.attributes.datatype, is(DataTypeEnum.float_));
210 waitForAssert(() -> assertThat(property.attributes.format, is("-100:100")));
211 verify(property, timeout(500).atLeastOnce()).attributesReceived();
213 // Receive property value
214 ChannelState channelState = spy(property.getChannelState());
215 PropertyHelper.setChannelState(property, channelState);
217 property.startChannel(connection, scheduler, 500).get();
218 verify(channelState).start(any(), any(), anyInt());
219 verify(channelState, timeout(500)).processMessage(any(), any());
220 verify(callback).updateChannelState(any(), any());
222 assertThat(property.getChannelState().getCache().getChannelState(), is(new DecimalType(10)));
224 property.stop().get();
225 assertThat(connection.hasSubscribers(), is(false));
228 // Inject a spy'ed property
229 public Property createSpyProperty(InvocationOnMock invocation) {
230 final Node node = (Node) invocation.getMock();
231 final String id = (String) invocation.getArguments()[0];
232 return spy(node.createProperty(id, spy(new PropertyAttributes())));
235 // Inject a spy'ed node
236 public Node createSpyNode(InvocationOnMock invocation) {
237 final Device device = (Device) invocation.getMock();
238 final String id = (String) invocation.getArguments()[0];
240 Node node = spy(device.createNode(id, spy(new NodeAttributes())));
241 // Intercept creating a property in the next call and inject a spy'ed property.
242 doAnswer(this::createSpyProperty).when(node).createProperty(any());
246 @SuppressWarnings("null")
248 public void parseHomieTree() throws InterruptedException, ExecutionException, TimeoutException {
249 // Create a Homie Device object. Because spied Nodes are required for call verification,
250 // the full Device constructor need to be used and a ChildMap object need to be created manually.
251 ChildMap<Node> nodeMap = new ChildMap<>();
253 new Device(ThingChannelConstants.testHomieThing, callback, new DeviceAttributes(), nodeMap));
255 // Intercept creating a node in initialize()->start() and inject a spy'ed node.
256 doAnswer(this::createSpyNode).when(device).createNode(any());
258 // initialize the device, subscribe and wait.
259 device.initialize(BASE_TOPIC, DEVICE_ID, Collections.emptyList());
260 device.subscribe(connection, scheduler, 1500).get();
262 assertThat(device.isInitialized(), is(true));
264 // Check device attributes
265 assertThat(device.attributes.homie, is("3.0"));
266 assertThat(device.attributes.name, is("Name"));
267 assertThat(device.attributes.state, is(ReadyState.ready));
268 assertThat(device.attributes.nodes.length, is(1));
269 verify(device, times(4)).attributeChanged(any(), any(), any(), any(), anyBoolean());
270 verify(callback).readyStateChanged(eq(ReadyState.ready));
271 verify(device).attributesReceived(any(), any(), anyInt());
274 assertThat(device.nodes.size(), is(1));
276 // Check node and node attributes
277 Node node = device.nodes.get("testnode");
278 verify(node).subscribe(any(), any(), anyInt());
279 verify(node).attributesReceived(any(), any(), anyInt());
280 verify(node.attributes).subscribeAndReceive(any(), any(), anyString(), any(), anyInt());
281 assertThat(node.attributes.type, is("Type"));
282 assertThat(node.attributes.name, is("Testnode"));
285 assertThat(node.properties.size(), is(3));
287 // Check property and property attributes
288 Property property = node.properties.get("temperature");
289 assertThat(property.attributes.settable, is(true));
290 assertThat(property.attributes.retained, is(true));
291 assertThat(property.attributes.name, is("Testprop"));
292 assertThat(property.attributes.unit, is("°C"));
293 assertThat(property.attributes.datatype, is(DataTypeEnum.float_));
294 assertThat(property.attributes.format, is("-100:100"));
295 verify(property).attributesReceived();
296 assertNotNull(property.getChannelState());
297 assertThat(property.getType().getState().getMinimum().intValue(), is(-100));
298 assertThat(property.getType().getState().getMaximum().intValue(), is(100));
300 // Check property and property attributes
301 Property propertyBell = node.properties.get("doorbell");
302 verify(propertyBell).attributesReceived();
303 assertThat(propertyBell.attributes.settable, is(false));
304 assertThat(propertyBell.attributes.retained, is(false));
305 assertThat(propertyBell.attributes.name, is("Doorbell"));
306 assertThat(propertyBell.attributes.datatype, is(DataTypeEnum.boolean_));
308 // The device->node->property tree is ready. Now subscribe to property values.
309 device.startChannels(connection, scheduler, 50, handler).get();
310 assertThat(propertyBell.getChannelState().isStateful(), is(false));
311 assertThat(propertyBell.getChannelState().getCache().getChannelState(), is(UnDefType.UNDEF));
312 assertThat(property.getChannelState().getCache().getChannelState(), is(new DecimalType(10)));
314 property = node.properties.get("testRetain");
315 WaitForTopicValue watcher = new WaitForTopicValue(embeddedConnection, propertyTestTopic + "/set");
316 // Watch the topic. Publish a retain=false value to MQTT
317 property.getChannelState().publishValue(OnOffType.OFF).get();
318 assertThat(watcher.waitForTopicValue(1000), is("false"));
320 // Publish a retain=false value to MQTT.
321 property.getChannelState().publishValue(OnOffType.ON).get();
322 // No value is expected to be retained on this MQTT topic
323 waitForAssert(() -> {
324 WaitForTopicValue w = new WaitForTopicValue(embeddedConnection, propertyTestTopic + "/set");
325 assertNull(w.waitForTopicValue(50));