]> git.basschouten.com Git - openhab-addons.git/blob
6022125724452b6195d1ac060df7f1010e6e9819
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.ruuvigateway;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.*;
18 import static org.mockito.ArgumentMatchers.any;
19 import static org.mockito.Mockito.when;
20 import static org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants.*;
21 import static org.openhab.core.library.unit.MetricPrefix.HECTO;
22
23 import java.lang.reflect.InvocationTargetException;
24 import java.lang.reflect.Method;
25 import java.math.BigDecimal;
26 import java.time.Instant;
27 import java.time.ZoneId;
28 import java.util.ArrayList;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Set;
35 import java.util.concurrent.CompletableFuture;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Function;
42 import java.util.stream.Collectors;
43
44 import javax.measure.quantity.Acceleration;
45 import javax.measure.quantity.Dimensionless;
46 import javax.measure.quantity.ElectricPotential;
47 import javax.measure.quantity.Power;
48 import javax.measure.quantity.Pressure;
49 import javax.measure.quantity.Temperature;
50
51 import org.eclipse.jdt.annotation.NonNullByDefault;
52 import org.eclipse.jdt.annotation.Nullable;
53 import org.junit.jupiter.api.AfterEach;
54 import org.junit.jupiter.api.BeforeEach;
55 import org.junit.jupiter.api.Test;
56 import org.junit.jupiter.api.extension.ExtendWith;
57 import org.junit.jupiter.params.ParameterizedTest;
58 import org.junit.jupiter.params.provider.CsvSource;
59 import org.junit.jupiter.params.provider.ValueSource;
60 import org.mockito.Mock;
61 import org.mockito.junit.jupiter.MockitoExtension;
62 import org.mockito.junit.jupiter.MockitoSettings;
63 import org.mockito.quality.Strictness;
64 import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryService;
65 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants;
66 import org.openhab.binding.mqtt.ruuvigateway.internal.discovery.RuuviGatewayDiscoveryService;
67 import org.openhab.binding.mqtt.ruuvigateway.internal.handler.RuuviTagHandler;
68 import org.openhab.core.config.core.Configuration;
69 import org.openhab.core.i18n.UnitProvider;
70 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
71 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
72 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
73 import org.openhab.core.items.GenericItem;
74 import org.openhab.core.library.CoreItemFactory;
75 import org.openhab.core.library.types.DateTimeType;
76 import org.openhab.core.library.types.DecimalType;
77 import org.openhab.core.library.types.QuantityType;
78 import org.openhab.core.library.types.StringType;
79 import org.openhab.core.library.unit.SIUnits;
80 import org.openhab.core.library.unit.Units;
81 import org.openhab.core.thing.Bridge;
82 import org.openhab.core.thing.Channel;
83 import org.openhab.core.thing.ChannelUID;
84 import org.openhab.core.thing.Thing;
85 import org.openhab.core.thing.ThingStatus;
86 import org.openhab.core.thing.ThingStatusDetail;
87 import org.openhab.core.thing.ThingStatusInfo;
88 import org.openhab.core.thing.ThingTypeUID;
89 import org.openhab.core.thing.ThingUID;
90 import org.openhab.core.thing.binding.ThingHandler;
91 import org.openhab.core.thing.binding.builder.BridgeBuilder;
92 import org.openhab.core.thing.binding.builder.ChannelBuilder;
93 import org.openhab.core.thing.binding.builder.ThingBuilder;
94 import org.openhab.core.thing.link.ItemChannelLink;
95 import org.openhab.core.types.State;
96 import org.openhab.core.types.UnDefType;
97
98 /**
99  * A full implementation test, that starts the embedded MQTT broker and publishes test data
100  *
101  * @author David Graeff - Initial contribution
102  * @author Sami Salonen - Adapted and extended to Ruuvi Gateway tests
103  */
104 @NonNullByDefault
105 @ExtendWith(MockitoExtension.class)
106 @MockitoSettings(strictness = Strictness.LENIENT)
107 public class RuuviGatewayTest extends MqttOSGiTest {
108     protected @Mock @NonNullByDefault({}) UnitProvider mockedUnitProvider;
109     private static final String BASE_TOPIC_RUUVI = "ruuvi";
110     private static final Map<String, String> CHANNEL_TO_ITEM_TYPE = new HashMap<>();
111     static {
112         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONX, "Number:Acceleration");
113         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONY, "Number:Acceleration");
114         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONZ, "Number:Acceleration");
115         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_BATTERY, "Number:ElectricPotential");
116         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_DATA_FORMAT, "Number");
117         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_HUMIDITY, "Number:Dimensionless");
118         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER, "Number");
119         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MOVEMENT_COUNTER, "Number");
120         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_PRESSURE, "Number:Pressure");
121         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TEMPERATURE, "Number:Temperature");
122         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TX_POWER, "Number:Power");
123         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_RSSI, "Number:Power");
124         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TS, "DateTime");
125         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWTS, "DateTime");
126         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWMAC, "String");
127     }
128
129     private ThingStatusInfoChangedSubscriber statusSubscriber = new ThingStatusInfoChangedSubscriber();
130     private @NonNullByDefault({}) MqttBrokerConnection mqttConnection;
131     private int registeredTopics = 100;
132
133     private @NonNullByDefault({}) ScheduledExecutorService scheduler;
134
135     /**
136      * Create an observer that fails the test as soon as the broker client connection changes its connection state
137      * to something else then CONNECTED.
138      */
139     private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
140             is(MqttConnectionState.CONNECTED));
141
142     @SuppressWarnings("unused") // used indirectly with Inbox
143     private @NonNullByDefault({}) RuuviGatewayDiscoveryService ruuviDiscoveryService;
144     private Set<Thing> things = new HashSet<>();
145
146     @BeforeEach
147     public void setup() {
148         when(mockedUnitProvider.getUnit(any())).then(i -> {
149             Class clazz = i.getArgument(0);
150             if (Temperature.class.equals(clazz)) {
151                 return SIUnits.CELSIUS;
152             } else if (Acceleration.class.equals(clazz)) {
153                 return Units.METRE_PER_SQUARE_SECOND;
154             } else if (Dimensionless.class.equals(clazz)) {
155                 return Units.ONE;
156             } else if (ElectricPotential.class.equals(clazz)) {
157                 return Units.VOLT;
158             } else if (Pressure.class.equals(clazz)) {
159                 return HECTO(SIUnits.PASCAL);
160             } else if (Power.class.equals(clazz)) {
161                 return Units.WATT;
162             }
163             return null;
164         });
165     }
166
167     private Bridge createMqttBrokerBridge() {
168         Configuration configuration = new Configuration();
169         configuration.put("host", "127.0.0.1");
170         configuration.put("port", brokerConnection.getPort());
171         Bridge bridge = BridgeBuilder.create(new ThingTypeUID("mqtt", "broker"), "mybroker").withLabel("MQTT Broker")
172                 .withConfiguration(configuration).build();
173         thingProvider.add(bridge);
174         waitForAssert(() -> assertNotNull(bridge.getHandler()));
175         assertNotNull(bridge.getConfiguration());
176         things.add(bridge);
177         return bridge;
178     }
179
180     private Thing createRuuviThing(String brokerPrefix, String topic, @Nullable Integer timeoutMillisecs) {
181         Configuration configuration = new Configuration();
182         configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TOPIC, topic);
183         if (timeoutMillisecs != null) {
184             configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TIMEOUT, timeoutMillisecs);
185         }
186         ThingUID bridgeThingUID = new ThingUID("mqtt", "broker", "mybroker");
187         ThingUID thingUID = new ThingUID(RuuviGatewayBindingConstants.THING_TYPE_BEACON,
188                 topic.replaceAll("[:_/]", "_"));
189         ThingBuilder thingBuilder = ThingBuilder.create(RuuviGatewayBindingConstants.THING_TYPE_BEACON, thingUID)
190                 .withBridge(bridgeThingUID).withLabel("Ruuvi " + topic).withConfiguration(configuration);
191
192         CHANNEL_TO_ITEM_TYPE.forEach((channelId, _itemType) -> {
193             thingBuilder.withChannel(ChannelBuilder.create(new ChannelUID(thingUID, channelId)).build());
194         });
195
196         Thing thing = thingBuilder.build();
197         thingProvider.add(thing);
198         waitForAssert(() -> assertNotNull(thing.getHandler()));
199         assertNotNull(thing.getConfiguration());
200         things.add(thing);
201         return thing;
202     }
203
204     private void triggerTimeoutHandling(Thing ruuviThing) {
205         // Simulate some time passing, so that RuuviTagHandler.heartbeat() is called twice
206         // Two heartbeat calls happens to trigger timeout handling in handler, one is not enough.
207         // (this is really implementation detail of RuuviTagHandler, making this test slightly
208         // error prone to possible changes in RuuviTagHandler implementation)
209         //
210         // 0. Assume some data received already, RuuviTagHandler.receivedData is true
211         // 1. First heartbeat sets receivedData=false; no further action is taken yet
212         // 2. Second heartbeat acts on false receivedData, e.g. updating Thing Status
213         for (int i = 0; i < 2; i++) {
214             callInternalHeartbeat(ruuviThing);
215         }
216     }
217
218     private void callInternalHeartbeat(Thing ruuviThing) {
219         ThingHandler handler = ruuviThing.getHandler();
220         Objects.requireNonNull(handler);
221         assertInstanceOf(RuuviTagHandler.class, handler);
222         RuuviTagHandler ruuviHandler = (RuuviTagHandler) handler;
223         try {
224             Method heartbeatMethod = RuuviTagHandler.class.getDeclaredMethod("heartbeat");
225             Objects.requireNonNull(heartbeatMethod);
226             heartbeatMethod.setAccessible(true);
227             heartbeatMethod.invoke(ruuviHandler);
228         } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
229                 | InvocationTargetException e) {
230             fail("Failed to call heartbeat method of thing handler via reflection. Bug in test? Details: "
231                     + e.getClass().getSimpleName() + ": " + e.getMessage());
232             throw new RuntimeException(e);
233         }
234     }
235
236     private String itemName(ChannelUID channelUID) {
237         return channelUID.getAsString().replace(":", "_");
238     }
239
240     private String linkChannelToAutogeneratedItem(ChannelUID channelUID) {
241         String itemName = itemName(channelUID);
242         String itemType = CHANNEL_TO_ITEM_TYPE.get(channelUID.getId());
243         GenericItem item = new CoreItemFactory(mockedUnitProvider).createItem(itemType, itemName);
244         assertNotNull(item, itemType);
245         itemProvider.add(item);
246         itemChannelLinkProvider.add(new ItemChannelLink(itemName, channelUID));
247         return itemName;
248     }
249
250     @Override
251     @BeforeEach
252     public void beforeEach() throws Exception {
253         super.beforeEach();
254
255         statusSubscriber.statusUpdates.clear();
256         registerService(statusSubscriber);
257
258         MQTTTopicDiscoveryService mqttTopicDiscoveryService = getService(MQTTTopicDiscoveryService.class);
259         assertNotNull(mqttTopicDiscoveryService);
260         ruuviDiscoveryService = new RuuviGatewayDiscoveryService(mqttTopicDiscoveryService);
261
262         createMqttBrokerBridge();
263
264         mqttConnection = createBrokerConnection("myclientid");
265
266         // If the connection state changes in between -> fail
267         mqttConnection.addConnectionObserver(failIfChange);
268
269         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
270         futures.add(publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00", "{}"));
271
272         registeredTopics = futures.size();
273         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.SECONDS);
274
275         scheduler = new ScheduledThreadPoolExecutor(6);
276     }
277
278     @Override
279     @AfterEach
280     public void afterEach() throws Exception {
281         if (mqttConnection != null) {
282             mqttConnection.removeConnectionObserver(failIfChange);
283             mqttConnection.stop().get(5, TimeUnit.SECONDS);
284         }
285         things.stream().map(thing -> thingProvider.remove(thing.getUID()));
286         unregisterService(statusSubscriber);
287
288         if (scheduler != null) {
289             scheduler.shutdownNow();
290         }
291         super.afterEach();
292     }
293
294     @Test
295     public void retrieveAllRuuviPrefixedTopics() throws Exception {
296         CountDownLatch c = new CountDownLatch(registeredTopics);
297         mqttConnection.subscribe(BASE_TOPIC_RUUVI + "/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS);
298         assertTrue(c.await(5, TimeUnit.SECONDS),
299                 "Connection " + mqttConnection.getClientId() + " not retrieving all topics ");
300     }
301
302     private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status,
303             @Nullable ThingStatusDetail detail, @Nullable String description) {
304         assertTrue(statusUpdates.size() > index,
305                 String.format("Not enough status updates. Expected %d, but only had %d. Status updates received: %s",
306                         index + 1, statusUpdates.size(),
307                         statusUpdates.stream().map(ThingStatusInfo::getStatus).collect(Collectors.toList())));
308         assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
309         assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
310         assertEquals(description, statusUpdates.get(index).getDescription(), statusUpdates.get(index).toString());
311     }
312
313     @SuppressWarnings("null")
314     private void assertThingStatusWithDescriptionPattern(List<ThingStatusInfo> statusUpdates, int index,
315             ThingStatus status, ThingStatusDetail detail, String descriptionPattern) {
316         assertTrue(statusUpdates.size() > index, "assert " + statusUpdates.size() + " > " + index + " failed");
317         assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
318         assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
319         assertTrue(statusUpdates.get(index).getDescription().matches(descriptionPattern),
320                 statusUpdates.get(index).toString());
321     }
322
323     private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status) {
324         assertThingStatus(statusUpdates, index, status, ThingStatusDetail.NONE, null);
325     }
326
327     private void assertItems(Function<String, State> channelStateGetter, String temperatureCelsius,
328             String accelerationXStandardGravity, String accelerationYStandardGravity,
329             String accelerationZStandardGravity, String batteryVolt, int dataFormat, String humidityPercent,
330             int measurementSequenceNumber, int movementCounter, String pressurePascal, String txPowerDecibelMilliwatts,
331             String rssiDecibelMilliwatts, Instant ts, Instant gwts, String gwMac) {
332         assertEquals(new QuantityType<>(new BigDecimal(temperatureCelsius), SIUnits.CELSIUS),
333                 channelStateGetter.apply(CHANNEL_ID_TEMPERATURE));
334         assertEquals(new QuantityType<>(new BigDecimal(accelerationXStandardGravity), Units.STANDARD_GRAVITY),
335                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONX));
336         assertEquals(new QuantityType<>(new BigDecimal(accelerationYStandardGravity), Units.STANDARD_GRAVITY),
337                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONY));
338         assertEquals(new QuantityType<>(new BigDecimal(accelerationZStandardGravity), Units.STANDARD_GRAVITY),
339                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONZ));
340         assertEquals(new QuantityType<>(new BigDecimal(batteryVolt), Units.VOLT),
341                 channelStateGetter.apply(CHANNEL_ID_BATTERY));
342         assertEquals(new DecimalType(dataFormat), channelStateGetter.apply(CHANNEL_ID_DATA_FORMAT));
343         assertEquals(new QuantityType<>(new BigDecimal(humidityPercent), Units.PERCENT),
344                 channelStateGetter.apply(CHANNEL_ID_HUMIDITY));
345         assertEquals(new DecimalType(new BigDecimal(measurementSequenceNumber)),
346                 channelStateGetter.apply(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER));
347         assertEquals(new DecimalType(new BigDecimal(movementCounter)),
348                 channelStateGetter.apply(CHANNEL_ID_MOVEMENT_COUNTER));
349         assertEquals(new QuantityType<>(new BigDecimal(pressurePascal), SIUnits.PASCAL),
350                 channelStateGetter.apply(CHANNEL_ID_PRESSURE));
351         assertEquals(new QuantityType<>(new BigDecimal(txPowerDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
352                 channelStateGetter.apply(CHANNEL_ID_TX_POWER));
353
354         assertEquals(new QuantityType<>(new BigDecimal(rssiDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
355                 channelStateGetter.apply(CHANNEL_ID_RSSI));
356         assertEquals(new DateTimeType(ts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_TS));
357         assertEquals(new DateTimeType(gwts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_GWTS));
358         assertEquals(new StringType(gwMac), channelStateGetter.apply(CHANNEL_ID_GWMAC));
359     }
360
361     @ParameterizedTest
362     @CsvSource(delimiter = '@', value = { //
363             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:AA:01 @" + "{}", // empty json
364             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:03 @" + "invalid json", // invalid json
365             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:04 @" + "0201061BFF990405", // payload too short
366             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:05 @"
367                     + "0201061BFF99050512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // wrong manufacturer id (the
368                                                                                         // two bytes after FF do not
369                                                                                         // match 99 04)
370             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:06 @"
371                     + "0201061BFA99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // unexpected advertisement (no
372                                                                                         // FF to indicate 'manufacturer
373                                                                                         // specific' advertisement)
374             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:07 @" + "{" + "  \"gw_mac\": \"DE:AD:BE:EF:00\","
375                     + "  \"rssi\": -82," + "  \"aoa\": [],"
376                     // data field is number, not a string
377                     + "  \"gwts\": \"1659365432\"," + "  \"ts\": \"1659365222\"," + "  \"data\": 999,"
378                     + "  \"coords\": \"\" }", // wrong json data types
379     })
380     public void testInvalidCases(String topic, String val) throws Exception {
381         final String jsonPayload;
382         if (val.contains("{")) {
383             // test argument is specifiying the whole json payload
384             jsonPayload = val;
385         } else {
386             // test argument is only specifiying the data field in the json payload
387             // Fill rest of the fields with some valid values
388             jsonPayload = "{" + "  \"gw_mac\": \"DE:AD:BE:EF:00\"," + "  \"rssi\": -82," + "  \"aoa\": [],"
389                     + "  \"gwts\": \"1659365432\"," + "  \"ts\": \"1659365222\"," + "  \"data\": \"" + val + "\","
390                     + "  \"coords\": \"\" }";
391         }
392
393         Thing ruuviThing = createRuuviThing("mygwid", topic, 100);
394         waitForAssert(() -> {
395             List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
396             assertNotNull(statusUpdates);
397             int statusUpdateIndex = 0;
398             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.INITIALIZING);
399             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.UNKNOWN);
400             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.ONLINE, ThingStatusDetail.NONE,
401                     "Waiting for initial data");
402             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
403                     ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
404             scheduler.execute(() -> publish(topic, jsonPayload));
405             assertThingStatusWithDescriptionPattern(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
406                     ThingStatusDetail.COMMUNICATION_ERROR, ".*could not be parsed.*");
407             assertEquals(statusUpdateIndex, statusUpdates.size());
408         });
409     }
410
411     @SuppressWarnings("null")
412     @Test
413     public void testDiscovery() {
414         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02", """
415                 {\
416                  "gw_mac": "DE:AD:BE:EF:00",\
417                  "rssi": -82,\
418                  "aoa": [],\
419                  "gwts": "1659365432",\
420                  "ts": "1659365222",\
421                  "data": "0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F",\
422                  "coords": "" }"""));
423         waitForAssert(() -> {
424             assertEquals(2, inbox.getAll().size(), inbox.getAll().toString());
425             var discovered = new HashSet<>(inbox.getAll());
426             for (var result : discovered) {
427                 assertEquals(THING_TYPE_BEACON, result.getThingTypeUID());
428                 assertEquals("topic", result.getRepresentationProperty());
429                 Object topic = result.getProperties().get("topic");
430                 assertNotNull(topic);
431                 assertTrue(
432                         // published in this test
433                         (BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02").equals(topic)
434                                 // published in beforeEach
435                                 || (BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00")
436                                         .equals(result.getProperties().get("topic")));
437             }
438         });
439     }
440
441     @ParameterizedTest
442     @ValueSource(booleans = { true, false })
443     public void testHappyFlow(boolean quickTimeout) {
444         // with quickTimeout=false, heartbeat is effectively disabled. Thing will not "timeout" and go OFFLINE
445         // with quickTimeout=true, timeout happens very fast. In CI we use infinite timeout and trigger timeout manually
446
447         Thing ruuviThing = createRuuviThing("mygwid", BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02", 9_000_000);
448         // Link all channels to freshly created items
449         ruuviThing.getChannels().stream().map(Channel::getUID).forEach(this::linkChannelToAutogeneratedItem);
450
451         @SuppressWarnings("null")
452         Function<String, State> getItemState = channelId -> itemRegistry
453                 .get(itemName(ruuviThing.getChannel(channelId).getUID())).getState();
454
455         AtomicInteger statusUpdateIndex = new AtomicInteger();
456         waitForAssert(() -> {
457             List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
458             assertNotNull(statusUpdates);
459
460             assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.INITIALIZING);
461             assertThingStatus(statusUpdates, statusUpdateIndex.get() + 1, ThingStatus.UNKNOWN);
462             assertThingStatus(statusUpdates, statusUpdateIndex.get() + 2, ThingStatus.ONLINE, ThingStatusDetail.NONE,
463                     "Waiting for initial data");
464
465             statusUpdateIndex.set(statusUpdateIndex.get() + 3);
466         });
467
468         List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
469         assertNotNull(statusUpdates);
470         if (quickTimeout) {
471             triggerTimeoutHandling(ruuviThing);
472             waitForAssert(() -> {
473                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
474                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
475
476                 CHANNEL_TO_ITEM_TYPE.keySet()
477                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
478                 statusUpdateIndex.incrementAndGet();
479             });
480         }
481
482         // publish some valid data ("valid case" test vector from
483         // https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
484         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02", """
485                 {\
486                  "gw_mac": "DE:AD:BE:EF:00",\
487                  "rssi": -82,\
488                  "aoa": [],\
489                  "gwts": "1659365432",\
490                  "ts": "1659365222",\
491                  "data": "0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F",\
492                  "coords": "" }"""));
493
494         waitForAssert(() -> {
495             assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
496             statusUpdateIndex.incrementAndGet();
497         });
498
499         waitForAssert(() -> {
500             assertItems(getItemState, //
501                     "24.3", // temperature, Celsius
502                     "0.004", // acc X, g
503                     "-0.004", // acc Y, g
504                     "1.036", // acc Z, g
505                     "2.9770000000000003", // battery, volt
506                     5, // data format
507                     "53.49", // humidity %
508                     205, // measurement seq
509                     66, // movement
510                     "100044", // pressure, pascal
511                     "4", // tx power, dBm
512                     "-82", // RSSI, dBm
513                     Instant.ofEpochSecond(1659365222), // ts
514                     Instant.ofEpochSecond(1659365432), // gwts
515                     "DE:AD:BE:EF:00" // gw mac
516             );
517         });
518
519         if (quickTimeout) {
520             triggerTimeoutHandling(ruuviThing);
521             waitForAssert(() -> {
522                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
523                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
524                 CHANNEL_TO_ITEM_TYPE.keySet()
525                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
526                 statusUpdateIndex.incrementAndGet();
527             });
528
529         }
530
531         // Another mqtt update (("minimum values" test vector from
532         // https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
533         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02", """
534                 {\
535                  "gw_mac": "DE:AD:BE:EF:00",\
536                  "rssi": -66,\
537                  "aoa": [],\
538                  "gwts": "1659365431",\
539                  "ts": "1659365221",\
540                  "data": "0201061BFF9904058001000000008001800180010000000000CBB8334C884F",\
541                  "coords": "" }"""));
542         if (quickTimeout) {
543             // With quick timeout we were previously offline, so now we should be back online
544             // with valid channels.
545             waitForAssert(() -> {
546                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
547                 statusUpdateIndex.getAndIncrement();
548             });
549
550             // ...after a while all items are updated
551             waitForAssert(() -> {
552                 assertItems(getItemState, //
553                         "-163.835", // temperature, Celsius
554                         "-32.767", // acc X, g
555                         "-32.767", // acc Y, g
556                         "-32.767", // acc Z, g
557                         "1.6", // battery, volt
558                         5, // data format
559                         "0.0", // humidity %
560                         0, // measurement seq
561                         0, // movement
562                         "50000", // pressure, pascal
563                         "-40", // tx power, dBm
564                         "-66", // RSSI, dBm
565                         Instant.ofEpochSecond(1659365221), // ts
566                         Instant.ofEpochSecond(1659365431), // gwts
567                         "DE:AD:BE:EF:00" // gw mac
568                 );
569             });
570
571             triggerTimeoutHandling(ruuviThing);
572             waitForAssert(() -> {
573                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
574                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
575                 CHANNEL_TO_ITEM_TYPE.keySet()
576                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
577                 statusUpdateIndex.getAndIncrement();
578             });
579         } else {
580             // with non-quick timeout we are still online, and items are updated
581             waitForAssert(() -> {
582                 assertItems(getItemState, //
583                         "-163.835", // temperature, Celsius
584                         "-32.767", // acc X, g
585                         "-32.767", // acc Y, g
586                         "-32.767", // acc Z, g
587                         "1.6", // battery, volt
588                         5, // data format
589                         "0.0", // humidity %
590                         0, // measurement seq
591                         0, // movement
592                         "50000", // pressure, pascal
593                         "-40", // tx power, dBm
594                         "-66", // RSSI, dBm
595                         Instant.ofEpochSecond(1659365221), // ts
596                         Instant.ofEpochSecond(1659365431), // gwts
597                         "DE:AD:BE:EF:00" // gw mac
598                 );
599             });
600         }
601
602         // assert that we have processed all status updates
603         assertEquals(statusUpdateIndex.get(), statusUpdates.size());
604     }
605 }