]> git.basschouten.com Git - openhab-addons.git/blob
ad5d6fcf14ce12490c76293419aa2e609186b0a1
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.ruuvigateway;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.*;
18 import static org.mockito.ArgumentMatchers.any;
19 import static org.mockito.Mockito.when;
20 import static org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants.*;
21 import static org.openhab.core.library.unit.MetricPrefix.HECTO;
22
23 import java.lang.reflect.InvocationTargetException;
24 import java.lang.reflect.Method;
25 import java.math.BigDecimal;
26 import java.time.Instant;
27 import java.time.ZoneId;
28 import java.util.ArrayList;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Set;
35 import java.util.concurrent.CompletableFuture;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Function;
42 import java.util.stream.Collectors;
43
44 import javax.measure.quantity.Acceleration;
45 import javax.measure.quantity.Dimensionless;
46 import javax.measure.quantity.ElectricPotential;
47 import javax.measure.quantity.Power;
48 import javax.measure.quantity.Pressure;
49 import javax.measure.quantity.Temperature;
50
51 import org.eclipse.jdt.annotation.NonNullByDefault;
52 import org.eclipse.jdt.annotation.Nullable;
53 import org.junit.jupiter.api.AfterEach;
54 import org.junit.jupiter.api.BeforeEach;
55 import org.junit.jupiter.api.Test;
56 import org.junit.jupiter.api.extension.ExtendWith;
57 import org.junit.jupiter.params.ParameterizedTest;
58 import org.junit.jupiter.params.provider.CsvSource;
59 import org.junit.jupiter.params.provider.ValueSource;
60 import org.mockito.Mock;
61 import org.mockito.junit.jupiter.MockitoExtension;
62 import org.mockito.junit.jupiter.MockitoSettings;
63 import org.mockito.quality.Strictness;
64 import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryService;
65 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants;
66 import org.openhab.binding.mqtt.ruuvigateway.internal.discovery.RuuviGatewayDiscoveryService;
67 import org.openhab.binding.mqtt.ruuvigateway.internal.handler.RuuviTagHandler;
68 import org.openhab.core.config.core.Configuration;
69 import org.openhab.core.i18n.UnitProvider;
70 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
71 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
72 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
73 import org.openhab.core.items.GenericItem;
74 import org.openhab.core.library.CoreItemFactory;
75 import org.openhab.core.library.types.DateTimeType;
76 import org.openhab.core.library.types.DecimalType;
77 import org.openhab.core.library.types.QuantityType;
78 import org.openhab.core.library.types.StringType;
79 import org.openhab.core.library.unit.SIUnits;
80 import org.openhab.core.library.unit.Units;
81 import org.openhab.core.thing.Bridge;
82 import org.openhab.core.thing.Channel;
83 import org.openhab.core.thing.ChannelUID;
84 import org.openhab.core.thing.Thing;
85 import org.openhab.core.thing.ThingStatus;
86 import org.openhab.core.thing.ThingStatusDetail;
87 import org.openhab.core.thing.ThingStatusInfo;
88 import org.openhab.core.thing.ThingTypeUID;
89 import org.openhab.core.thing.ThingUID;
90 import org.openhab.core.thing.binding.ThingHandler;
91 import org.openhab.core.thing.binding.builder.BridgeBuilder;
92 import org.openhab.core.thing.binding.builder.ChannelBuilder;
93 import org.openhab.core.thing.binding.builder.ThingBuilder;
94 import org.openhab.core.thing.link.ItemChannelLink;
95 import org.openhab.core.types.State;
96 import org.openhab.core.types.UnDefType;
97
98 /**
99  * A full implementation test, that starts the embedded MQTT broker and publishes test data
100  *
101  * @author David Graeff - Initial contribution
102  * @author Sami Salonen - Adapted and extended to Ruuvi Gateway tests
103  */
104 @NonNullByDefault
105 @ExtendWith(MockitoExtension.class)
106 @MockitoSettings(strictness = Strictness.LENIENT)
107 public class RuuviGatewayTest extends MqttOSGiTest {
108     protected @Mock @NonNullByDefault({}) UnitProvider mockedUnitProvider;
109     private static final String BASE_TOPIC_RUUVI = "ruuvi";
110     private static final Map<String, String> CHANNEL_TO_ITEM_TYPE = new HashMap<>();
111     static {
112         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONX, "Number:Acceleration");
113         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONY, "Number:Acceleration");
114         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONZ, "Number:Acceleration");
115         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_BATTERY, "Number:ElectricPotential");
116         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_DATA_FORMAT, "Number");
117         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_HUMIDITY, "Number:Dimensionless");
118         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER, "Number");
119         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MOVEMENT_COUNTER, "Number");
120         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_PRESSURE, "Number:Pressure");
121         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TEMPERATURE, "Number:Temperature");
122         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TX_POWER, "Number:Power");
123         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_RSSI, "Number:Power");
124         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TS, "DateTime");
125         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWTS, "DateTime");
126         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWMAC, "String");
127     }
128
129     private ThingStatusInfoChangedSubscriber statusSubscriber = new ThingStatusInfoChangedSubscriber();
130     private @NonNullByDefault({}) MqttBrokerConnection mqttConnection;
131     private int registeredTopics = 100;
132
133     private @NonNullByDefault({}) ScheduledExecutorService scheduler;
134
135     /**
136      * Create an observer that fails the test as soon as the broker client connection changes its connection state
137      * to something else then CONNECTED.
138      */
139     private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
140             is(MqttConnectionState.CONNECTED));
141
142     @SuppressWarnings("unused") // used indirectly with Inbox
143     private @NonNullByDefault({}) RuuviGatewayDiscoveryService ruuviDiscoveryService;
144     private Set<Thing> things = new HashSet<>();
145
146     @BeforeEach
147     public void setup() {
148         when(mockedUnitProvider.getUnit(any())).then(i -> {
149             Class clazz = i.getArgument(0);
150             if (Temperature.class.equals(clazz)) {
151                 return SIUnits.CELSIUS;
152             } else if (Acceleration.class.equals(clazz)) {
153                 return Units.METRE_PER_SQUARE_SECOND;
154             } else if (Dimensionless.class.equals(clazz)) {
155                 return Units.ONE;
156             } else if (ElectricPotential.class.equals(clazz)) {
157                 return Units.VOLT;
158             } else if (Pressure.class.equals(clazz)) {
159                 return HECTO(SIUnits.PASCAL);
160             } else if (Power.class.equals(clazz)) {
161                 return Units.WATT;
162             }
163             return null;
164         });
165     }
166
167     private Bridge createMqttBrokerBridge() {
168         Configuration configuration = new Configuration();
169         configuration.put("host", "127.0.0.1");
170         configuration.put("port", brokerConnection.getPort());
171         Bridge bridge = BridgeBuilder.create(new ThingTypeUID("mqtt", "broker"), "mybroker").withLabel("MQTT Broker")
172                 .withConfiguration(configuration).build();
173         thingProvider.add(bridge);
174         waitForAssert(() -> assertNotNull(bridge.getHandler()));
175         assertNotNull(bridge.getConfiguration());
176         things.add(bridge);
177         return bridge;
178     }
179
180     private Thing createRuuviThing(String brokerPrefix, String topic, @Nullable Integer timeoutMillisecs) {
181         Configuration configuration = new Configuration();
182         configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TOPIC, topic);
183         if (timeoutMillisecs != null) {
184             configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TIMEOUT, timeoutMillisecs);
185         }
186         ThingUID bridgeThingUID = new ThingUID("mqtt", "broker", "mybroker");
187         ThingUID thingUID = new ThingUID(RuuviGatewayBindingConstants.THING_TYPE_BEACON,
188                 topic.replaceAll("[:_/]", "_"));
189         ThingBuilder thingBuilder = ThingBuilder.create(RuuviGatewayBindingConstants.THING_TYPE_BEACON, thingUID)
190                 .withBridge(bridgeThingUID).withLabel("Ruuvi " + topic).withConfiguration(configuration);
191
192         CHANNEL_TO_ITEM_TYPE.forEach((channelId, _itemType) -> {
193             thingBuilder.withChannel(ChannelBuilder.create(new ChannelUID(thingUID, channelId)).build());
194         });
195
196         Thing thing = thingBuilder.build();
197         thingProvider.add(thing);
198         waitForAssert(() -> assertNotNull(thing.getHandler()));
199         assertNotNull(thing.getConfiguration());
200         things.add(thing);
201         return thing;
202     }
203
204     private void triggerTimeoutHandling(Thing ruuviThing) {
205         // Simulate some time passing, so that RuuviTagHandler.heartbeat() is called twice
206         // Two heartbeat calls happens to trigger timeout handling in handler, one is not enough.
207         // (this is really implementation detail of RuuviTagHandler, making this test slightly
208         // error prone to possible changes in RuuviTagHandler implementation)
209         //
210         // 0. Assume some data received already, RuuviTagHandler.receivedData is true
211         // 1. First heartbeat sets receivedData=false; no further action is taken yet
212         // 2. Second heartbeat acts on false receivedData, e.g. updating Thing Status
213         for (int i = 0; i < 2; i++) {
214             callInternalHeartbeat(ruuviThing);
215         }
216     }
217
218     private void callInternalHeartbeat(Thing ruuviThing) {
219         ThingHandler handler = ruuviThing.getHandler();
220         Objects.requireNonNull(handler);
221         assertInstanceOf(RuuviTagHandler.class, handler);
222         RuuviTagHandler ruuviHandler = (RuuviTagHandler) handler;
223         try {
224             Method heartbeatMethod = RuuviTagHandler.class.getDeclaredMethod("heartbeat");
225             Objects.requireNonNull(heartbeatMethod);
226             heartbeatMethod.setAccessible(true);
227             heartbeatMethod.invoke(ruuviHandler);
228         } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
229                 | InvocationTargetException e) {
230             fail("Failed to call heartbeat method of thing handler via reflection. Bug in test? Details: "
231                     + e.getClass().getSimpleName() + ": " + e.getMessage());
232             throw new RuntimeException(e);
233         }
234     }
235
236     private String itemName(ChannelUID channelUID) {
237         return channelUID.getAsString().replace(":", "_");
238     }
239
240     private String linkChannelToAutogeneratedItem(ChannelUID channelUID) {
241         String itemName = itemName(channelUID);
242         String itemType = CHANNEL_TO_ITEM_TYPE.get(channelUID.getId());
243         GenericItem item = new CoreItemFactory(mockedUnitProvider).createItem(itemType, itemName);
244         assertNotNull(item, itemType);
245         itemProvider.add(item);
246         itemChannelLinkProvider.add(new ItemChannelLink(itemName, channelUID));
247         return itemName;
248     }
249
250     @Override
251     @BeforeEach
252     public void beforeEach() throws Exception {
253         super.beforeEach();
254
255         statusSubscriber.statusUpdates.clear();
256         registerService(statusSubscriber);
257
258         MQTTTopicDiscoveryService mqttTopicDiscoveryService = getService(MQTTTopicDiscoveryService.class);
259         assertNotNull(mqttTopicDiscoveryService);
260         ruuviDiscoveryService = new RuuviGatewayDiscoveryService(mqttTopicDiscoveryService);
261
262         createMqttBrokerBridge();
263
264         mqttConnection = createBrokerConnection("myclientid");
265
266         // If the connection state changes in between -> fail
267         mqttConnection.addConnectionObserver(failIfChange);
268
269         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
270         futures.add(publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00", "{}"));
271
272         registeredTopics = futures.size();
273         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.SECONDS);
274
275         scheduler = new ScheduledThreadPoolExecutor(6);
276     }
277
278     @Override
279     @AfterEach
280     public void afterEach() throws Exception {
281         if (mqttConnection != null) {
282             mqttConnection.removeConnectionObserver(failIfChange);
283             mqttConnection.stop().get(5, TimeUnit.SECONDS);
284         }
285         things.stream().map(thing -> thingProvider.remove(thing.getUID()));
286         unregisterService(statusSubscriber);
287
288         if (scheduler != null) {
289             scheduler.shutdownNow();
290         }
291         super.afterEach();
292     }
293
294     @Test
295     public void retrieveAllRuuviPrefixedTopics() throws Exception {
296         CountDownLatch c = new CountDownLatch(registeredTopics);
297         mqttConnection.subscribe(BASE_TOPIC_RUUVI + "/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS);
298         assertTrue(c.await(5, TimeUnit.SECONDS),
299                 "Connection " + mqttConnection.getClientId() + " not retrieving all topics ");
300     }
301
302     private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status,
303             @Nullable ThingStatusDetail detail, @Nullable String description) {
304         assertTrue(statusUpdates.size() > index,
305                 String.format("Not enough status updates. Expected %d, but only had %d. Status updates received: %s",
306                         index + 1, statusUpdates.size(),
307                         statusUpdates.stream().map(ThingStatusInfo::getStatus).collect(Collectors.toList())));
308         assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
309         assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
310         assertEquals(description, statusUpdates.get(index).getDescription(), statusUpdates.get(index).toString());
311     }
312
313     @SuppressWarnings("null")
314     private void assertThingStatusWithDescriptionPattern(List<ThingStatusInfo> statusUpdates, int index,
315             ThingStatus status, ThingStatusDetail detail, String descriptionPattern) {
316         assertTrue(statusUpdates.size() > index, "assert " + statusUpdates.size() + " > " + index + " failed");
317         assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
318         assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
319         assertTrue(statusUpdates.get(index).getDescription().matches(descriptionPattern),
320                 statusUpdates.get(index).toString());
321     }
322
323     private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status) {
324         assertThingStatus(statusUpdates, index, status, ThingStatusDetail.NONE, null);
325     }
326
327     private void assertItems(Function<String, State> channelStateGetter, String temperatureCelsius,
328             String accelerationXStandardGravity, String accelerationYStandardGravity,
329             String accelerationZStandardGravity, String batteryVolt, int dataFormat, String humidityPercent,
330             int measurementSequenceNumber, int movementCounter, String pressurePascal, String txPowerDecibelMilliwatts,
331             String rssiDecibelMilliwatts, Instant ts, Instant gwts, String gwMac) {
332         assertEquals(new QuantityType<>(new BigDecimal(temperatureCelsius), SIUnits.CELSIUS),
333                 channelStateGetter.apply(CHANNEL_ID_TEMPERATURE));
334         assertEquals(new QuantityType<>(new BigDecimal(accelerationXStandardGravity), Units.STANDARD_GRAVITY),
335                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONX));
336         assertEquals(new QuantityType<>(new BigDecimal(accelerationYStandardGravity), Units.STANDARD_GRAVITY),
337                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONY));
338         assertEquals(new QuantityType<>(new BigDecimal(accelerationZStandardGravity), Units.STANDARD_GRAVITY),
339                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONZ));
340         assertEquals(new QuantityType<>(new BigDecimal(batteryVolt), Units.VOLT),
341                 channelStateGetter.apply(CHANNEL_ID_BATTERY));
342         assertEquals(new DecimalType(dataFormat), channelStateGetter.apply(CHANNEL_ID_DATA_FORMAT));
343         assertEquals(new QuantityType<>(new BigDecimal(humidityPercent), Units.PERCENT),
344                 channelStateGetter.apply(CHANNEL_ID_HUMIDITY));
345         assertEquals(new DecimalType(new BigDecimal(measurementSequenceNumber)),
346                 channelStateGetter.apply(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER));
347         assertEquals(new DecimalType(new BigDecimal(movementCounter)),
348                 channelStateGetter.apply(CHANNEL_ID_MOVEMENT_COUNTER));
349         assertEquals(new QuantityType<>(new BigDecimal(pressurePascal), SIUnits.PASCAL),
350                 channelStateGetter.apply(CHANNEL_ID_PRESSURE));
351         assertEquals(new QuantityType<>(new BigDecimal(txPowerDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
352                 channelStateGetter.apply(CHANNEL_ID_TX_POWER));
353
354         assertEquals(new QuantityType<>(new BigDecimal(rssiDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
355                 channelStateGetter.apply(CHANNEL_ID_RSSI));
356         assertEquals(new DateTimeType(ts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_TS));
357         assertEquals(new DateTimeType(gwts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_GWTS));
358         assertEquals(new StringType(gwMac), channelStateGetter.apply(CHANNEL_ID_GWMAC));
359     }
360
361     @ParameterizedTest
362     @CsvSource(delimiter = '@', value = { //
363             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:AA:01 @" + "{}", // empty json
364             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:03 @" + "invalid json", // invalid json
365             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:04 @" + "0201061BFF990405", // payload too short
366             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:05 @"
367                     + "0201061BFF99050512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // wrong manufacturer id (the
368                                                                                         // two bytes after FF do not
369                                                                                         // match 99 04)
370             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:06 @"
371                     + "0201061BFA99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // unexpected advertisement (no
372                                                                                         // FF to indicate 'manufacturer
373                                                                                         // specific' advertisement)
374             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:07 @" + "{" + "  \"gw_mac\": \"DE:AD:BE:EF:00\","
375                     + "  \"rssi\": -82," + "  \"aoa\": [],"
376                     // data field is number, not a string
377                     + "  \"gwts\": \"1659365432\"," + "  \"ts\": \"1659365222\"," + "  \"data\": 999,"
378                     + "  \"coords\": \"\" }", // wrong json data types
379     })
380     public void testInvalidCases(String topic, String val) throws Exception {
381         final String jsonPayload;
382         if (val.contains("{")) {
383             // test argument is specifiying the whole json payload
384             jsonPayload = val;
385         } else {
386             // test argument is only specifiying the data field in the json payload
387             // Fill rest of the fields with some valid values
388             jsonPayload = """
389                     {\
390                       "gw_mac": "DE:AD:BE:EF:00",\
391                       "rssi": -82,\
392                       "aoa": [],\
393                       "gwts": "1659365432",\
394                       "ts": "1659365222",\
395                       "data": "\
396                     """ + val + "\"," + "  \"coords\": \"\" }";
397         }
398
399         Thing ruuviThing = createRuuviThing("mygwid", topic, 100);
400         waitForAssert(() -> {
401             List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
402             assertNotNull(statusUpdates);
403             int statusUpdateIndex = 0;
404             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.INITIALIZING);
405             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.UNKNOWN);
406             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.ONLINE, ThingStatusDetail.NONE,
407                     "Waiting for initial data");
408             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
409                     ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
410             scheduler.execute(() -> publish(topic, jsonPayload));
411             assertThingStatusWithDescriptionPattern(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
412                     ThingStatusDetail.COMMUNICATION_ERROR, ".*could not be parsed.*");
413             assertEquals(statusUpdateIndex, statusUpdates.size());
414         });
415     }
416
417     @SuppressWarnings("null")
418     @Test
419     public void testDiscovery() {
420         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02", """
421                 {\
422                  "gw_mac": "DE:AD:BE:EF:00",\
423                  "rssi": -82,\
424                  "aoa": [],\
425                  "gwts": "1659365432",\
426                  "ts": "1659365222",\
427                  "data": "0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F",\
428                  "coords": "" }"""));
429         waitForAssert(() -> {
430             assertEquals(2, inbox.getAll().size(), inbox.getAll().toString());
431             var discovered = new HashSet<>(inbox.getAll());
432             for (var result : discovered) {
433                 assertEquals(THING_TYPE_BEACON, result.getThingTypeUID());
434                 assertEquals("topic", result.getRepresentationProperty());
435                 Object topic = result.getProperties().get("topic");
436                 assertNotNull(topic);
437                 assertTrue(
438                         // published in this test
439                         (BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02").equals(topic)
440                                 // published in beforeEach
441                                 || (BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00")
442                                         .equals(result.getProperties().get("topic")));
443             }
444         });
445     }
446
447     @ParameterizedTest
448     @ValueSource(booleans = { true, false })
449     public void testHappyFlow(boolean quickTimeout) {
450         // with quickTimeout=false, heartbeat is effectively disabled. Thing will not "timeout" and go OFFLINE
451         // with quickTimeout=true, timeout happens very fast. In CI we use infinite timeout and trigger timeout manually
452
453         Thing ruuviThing = createRuuviThing("mygwid", BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02", 9_000_000);
454         // Link all channels to freshly created items
455         ruuviThing.getChannels().stream().map(Channel::getUID).forEach(this::linkChannelToAutogeneratedItem);
456
457         @SuppressWarnings("null")
458         Function<String, State> getItemState = channelId -> itemRegistry
459                 .get(itemName(ruuviThing.getChannel(channelId).getUID())).getState();
460
461         AtomicInteger statusUpdateIndex = new AtomicInteger();
462         waitForAssert(() -> {
463             List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
464             assertNotNull(statusUpdates);
465
466             assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.INITIALIZING);
467             assertThingStatus(statusUpdates, statusUpdateIndex.get() + 1, ThingStatus.UNKNOWN);
468             assertThingStatus(statusUpdates, statusUpdateIndex.get() + 2, ThingStatus.ONLINE, ThingStatusDetail.NONE,
469                     "Waiting for initial data");
470
471             statusUpdateIndex.set(statusUpdateIndex.get() + 3);
472         });
473
474         List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
475         assertNotNull(statusUpdates);
476         if (quickTimeout) {
477             triggerTimeoutHandling(ruuviThing);
478             waitForAssert(() -> {
479                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
480                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
481
482                 CHANNEL_TO_ITEM_TYPE.keySet()
483                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
484                 statusUpdateIndex.incrementAndGet();
485             });
486         }
487
488         // publish some valid data ("valid case" test vector from
489         // https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
490         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02", """
491                 {\
492                  "gw_mac": "DE:AD:BE:EF:00",\
493                  "rssi": -82,\
494                  "aoa": [],\
495                  "gwts": "1659365432",\
496                  "ts": "1659365222",\
497                  "data": "0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F",\
498                  "coords": "" }"""));
499
500         waitForAssert(() -> {
501             assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
502             statusUpdateIndex.incrementAndGet();
503         });
504
505         waitForAssert(() -> {
506             assertItems(getItemState, //
507                     "24.3", // temperature, Celsius
508                     "0.004", // acc X, g
509                     "-0.004", // acc Y, g
510                     "1.036", // acc Z, g
511                     "2.9770000000000003", // battery, volt
512                     5, // data format
513                     "53.49", // humidity %
514                     205, // measurement seq
515                     66, // movement
516                     "100044", // pressure, pascal
517                     "4", // tx power, dBm
518                     "-82", // RSSI, dBm
519                     Instant.ofEpochSecond(1659365222), // ts
520                     Instant.ofEpochSecond(1659365432), // gwts
521                     "DE:AD:BE:EF:00" // gw mac
522             );
523         });
524
525         if (quickTimeout) {
526             triggerTimeoutHandling(ruuviThing);
527             waitForAssert(() -> {
528                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
529                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
530                 CHANNEL_TO_ITEM_TYPE.keySet()
531                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
532                 statusUpdateIndex.incrementAndGet();
533             });
534         }
535
536         // Another mqtt update (("minimum values" test vector from
537         // https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
538         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02", """
539                 {\
540                  "gw_mac": "DE:AD:BE:EF:00",\
541                  "rssi": -66,\
542                  "aoa": [],\
543                  "gwts": "1659365431",\
544                  "ts": "1659365221",\
545                  "data": "0201061BFF9904058001000000008001800180010000000000CBB8334C884F",\
546                  "coords": "" }"""));
547         if (quickTimeout) {
548             // With quick timeout we were previously offline, so now we should be back online
549             // with valid channels.
550             waitForAssert(() -> {
551                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
552                 statusUpdateIndex.getAndIncrement();
553             });
554
555             // ...after a while all items are updated
556             waitForAssert(() -> {
557                 assertItems(getItemState, //
558                         "-163.835", // temperature, Celsius
559                         "-32.767", // acc X, g
560                         "-32.767", // acc Y, g
561                         "-32.767", // acc Z, g
562                         "1.6", // battery, volt
563                         5, // data format
564                         "0.0", // humidity %
565                         0, // measurement seq
566                         0, // movement
567                         "50000", // pressure, pascal
568                         "-40", // tx power, dBm
569                         "-66", // RSSI, dBm
570                         Instant.ofEpochSecond(1659365221), // ts
571                         Instant.ofEpochSecond(1659365431), // gwts
572                         "DE:AD:BE:EF:00" // gw mac
573                 );
574             });
575
576             triggerTimeoutHandling(ruuviThing);
577             waitForAssert(() -> {
578                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
579                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
580                 CHANNEL_TO_ITEM_TYPE.keySet()
581                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
582                 statusUpdateIndex.getAndIncrement();
583             });
584         } else {
585             // with non-quick timeout we are still online, and items are updated
586             waitForAssert(() -> {
587                 assertItems(getItemState, //
588                         "-163.835", // temperature, Celsius
589                         "-32.767", // acc X, g
590                         "-32.767", // acc Y, g
591                         "-32.767", // acc Z, g
592                         "1.6", // battery, volt
593                         5, // data format
594                         "0.0", // humidity %
595                         0, // measurement seq
596                         0, // movement
597                         "50000", // pressure, pascal
598                         "-40", // tx power, dBm
599                         "-66", // RSSI, dBm
600                         Instant.ofEpochSecond(1659365221), // ts
601                         Instant.ofEpochSecond(1659365431), // gwts
602                         "DE:AD:BE:EF:00" // gw mac
603                 );
604             });
605         }
606
607         // assert that we have processed all status updates
608         assertEquals(statusUpdateIndex.get(), statusUpdates.size());
609     }
610 }