]> git.basschouten.com Git - openhab-addons.git/blob
b815cd962ba28f3fe81ef44faab83e99fad9d672
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.ruuvigateway;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.*;
18 import static org.mockito.Mockito.*;
19 import static org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants.*;
20 import static org.openhab.core.library.unit.MetricPrefix.HECTO;
21
22 import java.lang.reflect.InvocationTargetException;
23 import java.lang.reflect.Method;
24 import java.math.BigDecimal;
25 import java.time.Instant;
26 import java.time.ZoneId;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Objects;
33 import java.util.Set;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.function.Function;
41 import java.util.stream.Collectors;
42
43 import javax.measure.quantity.Acceleration;
44 import javax.measure.quantity.Dimensionless;
45 import javax.measure.quantity.ElectricPotential;
46 import javax.measure.quantity.Power;
47 import javax.measure.quantity.Pressure;
48 import javax.measure.quantity.Temperature;
49
50 import org.eclipse.jdt.annotation.NonNullByDefault;
51 import org.eclipse.jdt.annotation.Nullable;
52 import org.junit.jupiter.api.AfterEach;
53 import org.junit.jupiter.api.BeforeEach;
54 import org.junit.jupiter.api.Test;
55 import org.junit.jupiter.api.extension.ExtendWith;
56 import org.junit.jupiter.params.ParameterizedTest;
57 import org.junit.jupiter.params.provider.CsvSource;
58 import org.junit.jupiter.params.provider.ValueSource;
59 import org.mockito.Mock;
60 import org.mockito.junit.jupiter.MockitoExtension;
61 import org.mockito.junit.jupiter.MockitoSettings;
62 import org.mockito.quality.Strictness;
63 import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryService;
64 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants;
65 import org.openhab.binding.mqtt.ruuvigateway.internal.discovery.RuuviGatewayDiscoveryService;
66 import org.openhab.binding.mqtt.ruuvigateway.internal.handler.RuuviTagHandler;
67 import org.openhab.core.config.core.Configuration;
68 import org.openhab.core.config.discovery.DiscoveryResult;
69 import org.openhab.core.i18n.UnitProvider;
70 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
71 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
72 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
73 import org.openhab.core.items.GenericItem;
74 import org.openhab.core.library.CoreItemFactory;
75 import org.openhab.core.library.types.DateTimeType;
76 import org.openhab.core.library.types.DecimalType;
77 import org.openhab.core.library.types.QuantityType;
78 import org.openhab.core.library.types.StringType;
79 import org.openhab.core.library.unit.SIUnits;
80 import org.openhab.core.library.unit.Units;
81 import org.openhab.core.thing.Bridge;
82 import org.openhab.core.thing.Channel;
83 import org.openhab.core.thing.ChannelUID;
84 import org.openhab.core.thing.Thing;
85 import org.openhab.core.thing.ThingStatus;
86 import org.openhab.core.thing.ThingStatusDetail;
87 import org.openhab.core.thing.ThingStatusInfo;
88 import org.openhab.core.thing.ThingTypeUID;
89 import org.openhab.core.thing.ThingUID;
90 import org.openhab.core.thing.binding.ThingHandler;
91 import org.openhab.core.thing.binding.builder.BridgeBuilder;
92 import org.openhab.core.thing.binding.builder.ChannelBuilder;
93 import org.openhab.core.thing.binding.builder.ThingBuilder;
94 import org.openhab.core.thing.link.ItemChannelLink;
95 import org.openhab.core.types.State;
96 import org.openhab.core.types.UnDefType;
97
98 /**
99  * A full implementation test, that starts the embedded MQTT broker and publishes test data
100  *
101  * @author David Graeff - Initial contribution
102  * @author Sami Salonen - Adapted and extended to Ruuvi Gateway tests
103  */
104 @NonNullByDefault
105 @ExtendWith(MockitoExtension.class)
106 @MockitoSettings(strictness = Strictness.LENIENT)
107 public class RuuviGatewayTest extends MqttOSGiTest {
108     protected @Mock @NonNullByDefault({}) UnitProvider mockedUnitProvider;
109     private static final String BASE_TOPIC_RUUVI = "ruuvi";
110     private static final Map<String, String> CHANNEL_TO_ITEM_TYPE = new HashMap<>();
111     static {
112         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONX, "Number:Acceleration");
113         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONY, "Number:Acceleration");
114         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONZ, "Number:Acceleration");
115         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_BATTERY, "Number:ElectricPotential");
116         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_DATA_FORMAT, "Number");
117         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_HUMIDITY, "Number:Dimensionless");
118         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER, "Number");
119         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MOVEMENT_COUNTER, "Number");
120         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_PRESSURE, "Number:Pressure");
121         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TEMPERATURE, "Number:Temperature");
122         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TX_POWER, "Number:Power");
123         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_RSSI, "Number:Power");
124         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TS, "DateTime");
125         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWTS, "DateTime");
126         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWMAC, "String");
127     }
128
129     private ThingStatusInfoChangedSubscriber statusSubscriber = new ThingStatusInfoChangedSubscriber();
130     private @NonNullByDefault({}) MqttBrokerConnection mqttConnection;
131     private int registeredTopics = 100;
132
133     private @NonNullByDefault({}) ScheduledExecutorService scheduler;
134
135     /**
136      * Create an observer that fails the test as soon as the broker client connection changes its connection state
137      * to something else then CONNECTED.
138      */
139     private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
140             is(MqttConnectionState.CONNECTED));
141
142     @SuppressWarnings("unused") // used indirectly with Inbox
143     private @NonNullByDefault({}) RuuviGatewayDiscoveryService ruuviDiscoveryService;
144     private Set<Thing> things = new HashSet<>();
145
146     @BeforeEach
147     public void setup() {
148         when(mockedUnitProvider.getUnit(any())).then(i -> {
149             Class clazz = i.getArgument(0);
150             if (Temperature.class.equals(clazz)) {
151                 return SIUnits.CELSIUS;
152             } else if (Acceleration.class.equals(clazz)) {
153                 return Units.METRE_PER_SQUARE_SECOND;
154             } else if (Dimensionless.class.equals(clazz)) {
155                 return Units.ONE;
156             } else if (ElectricPotential.class.equals(clazz)) {
157                 return Units.VOLT;
158             } else if (Pressure.class.equals(clazz)) {
159                 return HECTO(SIUnits.PASCAL);
160             } else if (Power.class.equals(clazz)) {
161                 return Units.WATT;
162             }
163             return null;
164         });
165     }
166
167     private Bridge createMqttBrokerBridge() {
168         Configuration configuration = new Configuration();
169         configuration.put("host", "127.0.0.1");
170         configuration.put("port", brokerConnection.getPort());
171         Bridge bridge = BridgeBuilder.create(new ThingTypeUID("mqtt", "broker"), "mybroker").withLabel("MQTT Broker")
172                 .withConfiguration(configuration).build();
173         thingProvider.add(bridge);
174         waitForAssert(() -> assertNotNull(bridge.getHandler()));
175         assertNotNull(bridge.getConfiguration());
176         things.add(bridge);
177         return bridge;
178     }
179
180     private Thing createRuuviThing(String brokerPrefix, String topic, @Nullable Integer timeoutMillisecs) {
181         Configuration configuration = new Configuration();
182         configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TOPIC, topic);
183         if (timeoutMillisecs != null) {
184             configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TIMEOUT, timeoutMillisecs);
185         }
186         ThingUID bridgeThingUID = new ThingUID("mqtt", "broker", "mybroker");
187         ThingUID thingUID = new ThingUID(RuuviGatewayBindingConstants.THING_TYPE_BEACON,
188                 topic.replaceAll("[:_/]", "_"));
189         ThingBuilder thingBuilder = ThingBuilder.create(RuuviGatewayBindingConstants.THING_TYPE_BEACON, thingUID)
190                 .withBridge(bridgeThingUID).withLabel("Ruuvi " + topic).withConfiguration(configuration);
191
192         CHANNEL_TO_ITEM_TYPE.forEach((channelId, _itemType) -> {
193             thingBuilder.withChannel(ChannelBuilder.create(new ChannelUID(thingUID, channelId)).build());
194         });
195
196         Thing thing = thingBuilder.build();
197         thingProvider.add(thing);
198         waitForAssert(() -> assertNotNull(thing.getHandler()));
199         assertNotNull(thing.getConfiguration());
200         things.add(thing);
201         return thing;
202     }
203
204     private void triggerTimeoutHandling(Thing ruuviThing) {
205         // Simulate some time passing, so that RuuviTagHandler.heartbeat() is called twice
206         // Two heartbeat calls happens to trigger timeout handling in handler, one is not enough.
207         // (this is really implementation detail of RuuviTagHandler, making this test slightly
208         // error prone to possible changes in RuuviTagHandler implementation)
209         //
210         // 0. Assume some data received already, RuuviTagHandler.receivedData is true
211         // 1. First heartbeat sets receivedData=false; no further action is taken yet
212         // 2. Second heartbeat acts on false receivedData, e.g. updating Thing Status
213         for (int i = 0; i < 2; i++) {
214             callInternalHeartbeat(ruuviThing);
215         }
216     }
217
218     private void callInternalHeartbeat(Thing ruuviThing) {
219         ThingHandler handler = ruuviThing.getHandler();
220         Objects.requireNonNull(handler);
221         assertInstanceOf(RuuviTagHandler.class, handler);
222         RuuviTagHandler ruuviHandler = (RuuviTagHandler) handler;
223         try {
224             Method heartbeatMethod = RuuviTagHandler.class.getDeclaredMethod("heartbeat");
225             Objects.requireNonNull(heartbeatMethod);
226             heartbeatMethod.setAccessible(true);
227             heartbeatMethod.invoke(ruuviHandler);
228         } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
229                 | InvocationTargetException e) {
230             fail("Failed to call heartbeat method of thing handler via reflection. Bug in test? Details: "
231                     + e.getClass().getSimpleName() + ": " + e.getMessage());
232             throw new RuntimeException(e);
233         }
234     }
235
236     private String itemName(ChannelUID channelUID) {
237         return channelUID.getAsString().replace(":", "_");
238     }
239
240     private String linkChannelToAutogeneratedItem(ChannelUID channelUID) {
241         String itemName = itemName(channelUID);
242         String itemType = CHANNEL_TO_ITEM_TYPE.get(channelUID.getId());
243         GenericItem item = new CoreItemFactory(mockedUnitProvider).createItem(itemType, itemName);
244         assertNotNull(item, itemType);
245         itemProvider.add(item);
246         itemChannelLinkProvider.add(new ItemChannelLink(itemName, channelUID));
247         return itemName;
248     }
249
250     @Override
251     @BeforeEach
252     public void beforeEach() throws Exception {
253         super.beforeEach();
254
255         statusSubscriber.statusUpdates.clear();
256         registerService(statusSubscriber);
257
258         MQTTTopicDiscoveryService mqttTopicDiscoveryService = getService(MQTTTopicDiscoveryService.class);
259         assertNotNull(mqttTopicDiscoveryService);
260         ruuviDiscoveryService = new RuuviGatewayDiscoveryService(mqttTopicDiscoveryService);
261
262         createMqttBrokerBridge();
263
264         mqttConnection = createBrokerConnection("myclientid");
265
266         // If the connection state changes in between -> fail
267         mqttConnection.addConnectionObserver(failIfChange);
268
269         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
270         futures.add(publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00", "{}"));
271
272         registeredTopics = futures.size();
273         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.SECONDS);
274
275         scheduler = new ScheduledThreadPoolExecutor(6);
276     }
277
278     @Override
279     @AfterEach
280     public void afterEach() throws Exception {
281         if (mqttConnection != null) {
282             mqttConnection.removeConnectionObserver(failIfChange);
283             mqttConnection.stop().get(5, TimeUnit.SECONDS);
284         }
285         things.stream().map(thing -> thingProvider.remove(thing.getUID()));
286         unregisterService(statusSubscriber);
287
288         if (scheduler != null) {
289             scheduler.shutdownNow();
290         }
291         super.afterEach();
292     }
293
294     @Test
295     public void retrieveAllRuuviPrefixedTopics() throws Exception {
296         CountDownLatch c = new CountDownLatch(registeredTopics);
297         mqttConnection.subscribe(BASE_TOPIC_RUUVI + "/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS);
298         assertTrue(c.await(5, TimeUnit.SECONDS),
299                 "Connection " + mqttConnection.getClientId() + " not retrieving all topics ");
300     }
301
302     private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status,
303             @Nullable ThingStatusDetail detail, @Nullable String description) {
304         assertTrue(statusUpdates.size() > index,
305                 String.format("Not enough status updates. Expected %d, but only had %d. Status updates received: %s",
306                         index + 1, statusUpdates.size(),
307                         statusUpdates.stream().map(ThingStatusInfo::getStatus).collect(Collectors.toList())));
308         assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
309         assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
310         assertEquals(description, statusUpdates.get(index).getDescription(), statusUpdates.get(index).toString());
311     }
312
313     @SuppressWarnings("null")
314     private void assertThingStatusWithDescriptionPattern(List<ThingStatusInfo> statusUpdates, int index,
315             ThingStatus status, ThingStatusDetail detail, String descriptionPattern) {
316         assertTrue(statusUpdates.size() > index, "assert " + statusUpdates.size() + " > " + index + " failed");
317         assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
318         assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
319         assertTrue(statusUpdates.get(index).getDescription().matches(descriptionPattern),
320                 statusUpdates.get(index).toString());
321     }
322
323     private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status) {
324         assertThingStatus(statusUpdates, index, status, ThingStatusDetail.NONE, null);
325     }
326
327     private void assertItems(Function<String, State> channelStateGetter, String temperatureCelsius,
328             String accelerationXStandardGravity, String accelerationYStandardGravity,
329             String accelerationZStandardGravity, String batteryVolt, int dataFormat, String humidityPercent,
330             int measurementSequenceNumber, int movementCounter, String pressurePascal, String txPowerDecibelMilliwatts,
331             String rssiDecibelMilliwatts, Instant ts, Instant gwts, String gwMac) {
332         assertEquals(new QuantityType<Temperature>(new BigDecimal(temperatureCelsius), SIUnits.CELSIUS),
333                 channelStateGetter.apply(CHANNEL_ID_TEMPERATURE));
334         assertEquals(
335                 new QuantityType<Acceleration>(new BigDecimal(accelerationXStandardGravity), Units.STANDARD_GRAVITY),
336                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONX));
337         assertEquals(
338                 new QuantityType<Acceleration>(new BigDecimal(accelerationYStandardGravity), Units.STANDARD_GRAVITY),
339                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONY));
340         assertEquals(
341                 new QuantityType<Acceleration>(new BigDecimal(accelerationZStandardGravity), Units.STANDARD_GRAVITY),
342                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONZ));
343         assertEquals(new QuantityType<ElectricPotential>(new BigDecimal(batteryVolt), Units.VOLT),
344                 channelStateGetter.apply(CHANNEL_ID_BATTERY));
345         assertEquals(new DecimalType(dataFormat), channelStateGetter.apply(CHANNEL_ID_DATA_FORMAT));
346         assertEquals(new QuantityType<Dimensionless>(new BigDecimal(humidityPercent), Units.PERCENT),
347                 channelStateGetter.apply(CHANNEL_ID_HUMIDITY));
348         assertEquals(new DecimalType(new BigDecimal(measurementSequenceNumber)),
349                 channelStateGetter.apply(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER));
350         assertEquals(new DecimalType(new BigDecimal(movementCounter)),
351                 channelStateGetter.apply(CHANNEL_ID_MOVEMENT_COUNTER));
352         assertEquals(new QuantityType<Pressure>(new BigDecimal(pressurePascal), SIUnits.PASCAL),
353                 channelStateGetter.apply(CHANNEL_ID_PRESSURE));
354         assertEquals(new QuantityType<Power>(new BigDecimal(txPowerDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
355                 channelStateGetter.apply(CHANNEL_ID_TX_POWER));
356
357         assertEquals(new QuantityType<Power>(new BigDecimal(rssiDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
358                 channelStateGetter.apply(CHANNEL_ID_RSSI));
359         assertEquals(new DateTimeType(ts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_TS));
360         assertEquals(new DateTimeType(gwts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_GWTS));
361         assertEquals(new StringType(gwMac), channelStateGetter.apply(CHANNEL_ID_GWMAC));
362     }
363
364     @ParameterizedTest
365     @CsvSource(delimiter = '@', value = { //
366             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:AA:01 @" + "{}", // empty json
367             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:03 @" + "invalid json", // invalid json
368             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:04 @" + "0201061BFF990405", // payload too short
369             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:05 @"
370                     + "0201061BFF99050512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // wrong manufacturer id (the
371                                                                                         // two bytes after FF do not
372                                                                                         // match 99 04)
373             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:06 @"
374                     + "0201061BFA99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // unexpected advertisement (no
375                                                                                         // FF to indicate 'manufacturer
376                                                                                         // specific' advertisement)
377             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:07 @" + "{" + "  \"gw_mac\": \"DE:AD:BE:EF:00\","
378                     + "  \"rssi\": -82," + "  \"aoa\": [],"
379                     // data field is number, not a string
380                     + "  \"gwts\": \"1659365432\"," + "  \"ts\": \"1659365222\"," + "  \"data\": 999,"
381                     + "  \"coords\": \"\" }", // wrong json data types
382     })
383     public void testInvalidCases(String topic, String val) throws Exception {
384         final String jsonPayload;
385         if (val.contains("{")) {
386             // test argument is specifiying the whole json payload
387             jsonPayload = val;
388         } else {
389             // test argument is only specifiying the data field in the json payload
390             // Fill rest of the fields with some valid values
391             jsonPayload = "{" + "  \"gw_mac\": \"DE:AD:BE:EF:00\"," + "  \"rssi\": -82," + "  \"aoa\": [],"
392                     + "  \"gwts\": \"1659365432\"," + "  \"ts\": \"1659365222\"," + "  \"data\": \"" + val + "\","
393                     + "  \"coords\": \"\" }";
394         }
395
396         Thing ruuviThing = createRuuviThing("mygwid", topic, 100);
397         waitForAssert(() -> {
398             List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
399             assertNotNull(statusUpdates);
400             int statusUpdateIndex = 0;
401             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.INITIALIZING);
402             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.UNKNOWN);
403             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.ONLINE, ThingStatusDetail.NONE,
404                     "Waiting for initial data");
405             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
406                     ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
407             scheduler.execute(() -> publish(topic, jsonPayload));
408             assertThingStatusWithDescriptionPattern(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
409                     ThingStatusDetail.COMMUNICATION_ERROR, ".*could not be parsed.*");
410             assertEquals(statusUpdateIndex, statusUpdates.size());
411         });
412     }
413
414     @SuppressWarnings("null")
415     @Test
416     public void testDiscovery() {
417         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
418                 "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -82," + " \"aoa\": [],"
419                         + " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\","
420                         + " \"data\": \"0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F\","
421                         + " \"coords\": \"\" }"));
422         waitForAssert(() -> {
423             assertEquals(2, inbox.getAll().size(), inbox.getAll().toString());
424             var discovered = new HashSet<DiscoveryResult>();
425             discovered.addAll(inbox.getAll());
426
427             for (var result : discovered) {
428                 assertEquals(THING_TYPE_BEACON, result.getThingTypeUID());
429                 assertEquals("topic", result.getRepresentationProperty());
430                 Object topic = result.getProperties().get("topic");
431                 assertNotNull(topic);
432                 assertTrue(
433                         // published in this test
434                         topic.equals((BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02"))
435                                 // published in beforeEach
436                                 || result.getProperties().get("topic")
437                                         .equals((BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00")));
438             }
439         });
440     }
441
442     @ParameterizedTest
443     @ValueSource(booleans = { true, false })
444     public void testHappyFlow(boolean quickTimeout) {
445         // with quickTimeout=false, heartbeat is effectively disabled. Thing will not "timeout" and go OFFLINE
446         // with quickTimeout=true, timeout happens very fast. In CI we use infinite timeout and trigger timeout manually
447
448         Thing ruuviThing = createRuuviThing("mygwid", BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
449                 quickTimeout ? (isRunningInCI() ? 9_000_000 : 100) : 9_000_000);
450         // Link all channels to freshly created items
451         ruuviThing.getChannels().stream().map(Channel::getUID).forEach(this::linkChannelToAutogeneratedItem);
452
453         @SuppressWarnings("null")
454         Function<String, State> getItemState = channelId -> itemRegistry
455                 .get(itemName(ruuviThing.getChannel(channelId).getUID())).getState();
456
457         AtomicInteger statusUpdateIndex = new AtomicInteger();
458         waitForAssert(() -> {
459             List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
460             assertNotNull(statusUpdates);
461
462             assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.INITIALIZING);
463             assertThingStatus(statusUpdates, statusUpdateIndex.get() + 1, ThingStatus.UNKNOWN);
464             assertThingStatus(statusUpdates, statusUpdateIndex.get() + 2, ThingStatus.ONLINE, ThingStatusDetail.NONE,
465                     "Waiting for initial data");
466
467             statusUpdateIndex.set(statusUpdateIndex.get() + 3);
468         });
469
470         List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
471         assertNotNull(statusUpdates);
472         if (quickTimeout) {
473             if (isRunningInCI()) {
474                 triggerTimeoutHandling(ruuviThing);
475             }
476             waitForAssert(() -> {
477                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
478                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
479
480                 CHANNEL_TO_ITEM_TYPE.keySet()
481                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
482                 statusUpdateIndex.incrementAndGet();
483             });
484         }
485
486         // publish some valid data ("valid case" test vector from
487         // https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
488         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
489                 "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -82," + " \"aoa\": [],"
490                         + " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\","
491                         + " \"data\": \"0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F\","
492                         + " \"coords\": \"\" }"));
493
494         waitForAssert(() -> {
495             assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
496             statusUpdateIndex.incrementAndGet();
497         });
498
499         waitForAssert(() -> {
500             assertItems(getItemState, //
501                     "24.3", // temperature, Celsius
502                     "0.004", // acc X, g
503                     "-0.004", // acc Y, g
504                     "1.036", // acc Z, g
505                     "2.9770000000000003", // battery, volt
506                     5, // data format
507                     "53.49", // humidity %
508                     205, // measurement seq
509                     66, // movement
510                     "100044", // pressure, pascal
511                     "4", // tx power, dBm
512                     "-82", // RSSI, dBm
513                     Instant.ofEpochSecond(1659365222), // ts
514                     Instant.ofEpochSecond(1659365432), // gwts
515                     "DE:AD:BE:EF:00" // gw mac
516             );
517         });
518
519         if (quickTimeout) {
520             if (isRunningInCI()) {
521                 triggerTimeoutHandling(ruuviThing);
522             }
523             waitForAssert(() -> {
524                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
525                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
526                 CHANNEL_TO_ITEM_TYPE.keySet()
527                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
528                 statusUpdateIndex.incrementAndGet();
529             });
530
531         }
532
533         // Another mqtt update (("minimum values" test vector from
534         // https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
535         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
536                 "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -66," + " \"aoa\": [],"
537                         + " \"gwts\": \"1659365431\"," + " \"ts\": \"1659365221\","
538                         + " \"data\": \"0201061BFF9904058001000000008001800180010000000000CBB8334C884F\","
539                         + " \"coords\": \"\" }"));
540         if (quickTimeout) {
541             // With quick timeout we were previously offline, so now we should be back online
542             // with valid channels.
543             waitForAssert(() -> {
544                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
545                 statusUpdateIndex.getAndIncrement();
546             });
547
548             // ...after a while all items are updated
549             waitForAssert(() -> {
550                 assertItems(getItemState, //
551                         "-163.835", // temperature, Celsius
552                         "-32.767", // acc X, g
553                         "-32.767", // acc Y, g
554                         "-32.767", // acc Z, g
555                         "1.6", // battery, volt
556                         5, // data format
557                         "0.0", // humidity %
558                         0, // measurement seq
559                         0, // movement
560                         "50000", // pressure, pascal
561                         "-40", // tx power, dBm
562                         "-66", // RSSI, dBm
563                         Instant.ofEpochSecond(1659365221), // ts
564                         Instant.ofEpochSecond(1659365431), // gwts
565                         "DE:AD:BE:EF:00" // gw mac
566                 );
567             });
568
569             // ...after which timeout will happen again
570             if (isRunningInCI()) {
571                 triggerTimeoutHandling(ruuviThing);
572             }
573             waitForAssert(() -> {
574                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
575                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
576                 CHANNEL_TO_ITEM_TYPE.keySet()
577                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
578                 statusUpdateIndex.getAndIncrement();
579             });
580         } else {
581             // with non-quick timeout we are still online, and items are updated
582             waitForAssert(() -> {
583                 assertItems(getItemState, //
584                         "-163.835", // temperature, Celsius
585                         "-32.767", // acc X, g
586                         "-32.767", // acc Y, g
587                         "-32.767", // acc Z, g
588                         "1.6", // battery, volt
589                         5, // data format
590                         "0.0", // humidity %
591                         0, // measurement seq
592                         0, // movement
593                         "50000", // pressure, pascal
594                         "-40", // tx power, dBm
595                         "-66", // RSSI, dBm
596                         Instant.ofEpochSecond(1659365221), // ts
597                         Instant.ofEpochSecond(1659365431), // gwts
598                         "DE:AD:BE:EF:00" // gw mac
599                 );
600             });
601         }
602
603         // assert that we have processed all status updates
604         assertEquals(statusUpdateIndex.get(), statusUpdates.size());
605     }
606 }