]> git.basschouten.com Git - openhab-addons.git/blob
3e1602309647a67d01c70994740ce4c8a71b60ab
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.ruuvigateway;
14
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.*;
18 import static org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants.*;
19
20 import java.lang.reflect.InvocationTargetException;
21 import java.lang.reflect.Method;
22 import java.math.BigDecimal;
23 import java.time.Instant;
24 import java.time.ZoneId;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Objects;
31 import java.util.Set;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.ScheduledExecutorService;
35 import java.util.concurrent.ScheduledThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.function.Function;
39 import java.util.stream.Collectors;
40
41 import javax.measure.quantity.Acceleration;
42 import javax.measure.quantity.Dimensionless;
43 import javax.measure.quantity.ElectricPotential;
44 import javax.measure.quantity.Power;
45 import javax.measure.quantity.Pressure;
46 import javax.measure.quantity.Temperature;
47
48 import org.eclipse.jdt.annotation.NonNullByDefault;
49 import org.eclipse.jdt.annotation.Nullable;
50 import org.junit.jupiter.api.AfterEach;
51 import org.junit.jupiter.api.BeforeEach;
52 import org.junit.jupiter.api.Test;
53 import org.junit.jupiter.params.ParameterizedTest;
54 import org.junit.jupiter.params.provider.CsvSource;
55 import org.junit.jupiter.params.provider.ValueSource;
56 import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryService;
57 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants;
58 import org.openhab.binding.mqtt.ruuvigateway.internal.discovery.RuuviGatewayDiscoveryService;
59 import org.openhab.binding.mqtt.ruuvigateway.internal.handler.RuuviTagHandler;
60 import org.openhab.core.config.core.Configuration;
61 import org.openhab.core.config.discovery.DiscoveryResult;
62 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
63 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
64 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
65 import org.openhab.core.items.GenericItem;
66 import org.openhab.core.library.CoreItemFactory;
67 import org.openhab.core.library.types.DateTimeType;
68 import org.openhab.core.library.types.DecimalType;
69 import org.openhab.core.library.types.QuantityType;
70 import org.openhab.core.library.types.StringType;
71 import org.openhab.core.library.unit.SIUnits;
72 import org.openhab.core.library.unit.Units;
73 import org.openhab.core.thing.Bridge;
74 import org.openhab.core.thing.Channel;
75 import org.openhab.core.thing.ChannelUID;
76 import org.openhab.core.thing.Thing;
77 import org.openhab.core.thing.ThingStatus;
78 import org.openhab.core.thing.ThingStatusDetail;
79 import org.openhab.core.thing.ThingStatusInfo;
80 import org.openhab.core.thing.ThingTypeUID;
81 import org.openhab.core.thing.ThingUID;
82 import org.openhab.core.thing.binding.ThingHandler;
83 import org.openhab.core.thing.binding.builder.BridgeBuilder;
84 import org.openhab.core.thing.binding.builder.ChannelBuilder;
85 import org.openhab.core.thing.binding.builder.ThingBuilder;
86 import org.openhab.core.thing.link.ItemChannelLink;
87 import org.openhab.core.types.State;
88 import org.openhab.core.types.UnDefType;
89
90 /**
91  * A full implementation test, that starts the embedded MQTT broker and publishes test data
92  *
93  * @author David Graeff - Initial contribution
94  * @author Sami Salonen - Adapted and extended to Ruuvi Gateway tests
95  */
96 @NonNullByDefault
97 public class RuuviGatewayTest extends MqttOSGiTest {
98     private static final String BASE_TOPIC_RUUVI = "ruuvi";
99     private static final Map<String, String> CHANNEL_TO_ITEM_TYPE = new HashMap<>();
100     static {
101         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONX, "Number:Acceleration");
102         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONY, "Number:Acceleration");
103         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONZ, "Number:Acceleration");
104         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_BATTERY, "Number:ElectricPotential");
105         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_DATA_FORMAT, "Number");
106         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_HUMIDITY, "Number:Dimensionless");
107         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER, "Number");
108         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MOVEMENT_COUNTER, "Number");
109         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_PRESSURE, "Number:Pressure");
110         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TEMPERATURE, "Number:Temperature");
111         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TX_POWER, "Number:Power");
112         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_RSSI, "Number:Power");
113         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TS, "DateTime");
114         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWTS, "DateTime");
115         CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWMAC, "String");
116     }
117
118     private ThingStatusInfoChangedSubscriber statusSubscriber = new ThingStatusInfoChangedSubscriber();
119     private @NonNullByDefault({}) MqttBrokerConnection mqttConnection;
120     private int registeredTopics = 100;
121
122     private @NonNullByDefault({}) ScheduledExecutorService scheduler;
123
124     /**
125      * Create an observer that fails the test as soon as the broker client connection changes its connection state
126      * to something else then CONNECTED.
127      */
128     private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
129             is(MqttConnectionState.CONNECTED));
130
131     @SuppressWarnings("unused") // used indirectly with Inbox
132     private @NonNullByDefault({}) RuuviGatewayDiscoveryService ruuviDiscoveryService;
133     private Set<Thing> things = new HashSet<>();
134
135     private Bridge createMqttBrokerBridge() {
136         Configuration configuration = new Configuration();
137         configuration.put("host", "127.0.0.1");
138         configuration.put("port", brokerConnection.getPort());
139         Bridge bridge = BridgeBuilder.create(new ThingTypeUID("mqtt", "broker"), "mybroker").withLabel("MQTT Broker")
140                 .withConfiguration(configuration).build();
141         thingProvider.add(bridge);
142         waitForAssert(() -> assertNotNull(bridge.getHandler()));
143         assertNotNull(bridge.getConfiguration());
144         things.add(bridge);
145         return bridge;
146     }
147
148     private Thing createRuuviThing(String brokerPrefix, String topic, @Nullable Integer timeoutMillisecs) {
149         Configuration configuration = new Configuration();
150         configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TOPIC, topic);
151         if (timeoutMillisecs != null) {
152             configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TIMEOUT, timeoutMillisecs);
153         }
154         ThingUID bridgeThingUID = new ThingUID("mqtt", "broker", "mybroker");
155         ThingUID thingUID = new ThingUID(RuuviGatewayBindingConstants.THING_TYPE_BEACON,
156                 topic.replaceAll("[:_/]", "_"));
157         ThingBuilder thingBuilder = ThingBuilder.create(RuuviGatewayBindingConstants.THING_TYPE_BEACON, thingUID)
158                 .withBridge(bridgeThingUID).withLabel("Ruuvi " + topic).withConfiguration(configuration);
159
160         CHANNEL_TO_ITEM_TYPE.forEach((channelId, _itemType) -> {
161             thingBuilder.withChannel(ChannelBuilder.create(new ChannelUID(thingUID, channelId)).build());
162         });
163
164         Thing thing = thingBuilder.build();
165         thingProvider.add(thing);
166         waitForAssert(() -> assertNotNull(thing.getHandler()));
167         assertNotNull(thing.getConfiguration());
168         things.add(thing);
169         return thing;
170     }
171
172     private void triggerTimeoutHandling(Thing ruuviThing) {
173         // Simulate some time passing, so that RuuviTagHandler.heartbeat() is called twice
174         // Two heartbeat calls happens to trigger timeout handling in handler, one is not enough.
175         // (this is really implementation detail of RuuviTagHandler, making this test slightly
176         // error prone to possible changes in RuuviTagHandler implementation)
177         //
178         // 0. Assume some data received already, RuuviTagHandler.receivedData is true
179         // 1. First heartbeat sets receivedData=false; no further action is taken yet
180         // 2. Second heartbeat acts on false receivedData, e.g. updating Thing Status
181         for (int i = 0; i < 2; i++) {
182             callInternalHeartbeat(ruuviThing);
183         }
184     }
185
186     private void callInternalHeartbeat(Thing ruuviThing) {
187         ThingHandler handler = ruuviThing.getHandler();
188         Objects.requireNonNull(handler);
189         assertInstanceOf(RuuviTagHandler.class, handler);
190         RuuviTagHandler ruuviHandler = (RuuviTagHandler) handler;
191         try {
192             Method heartbeatMethod = RuuviTagHandler.class.getDeclaredMethod("heartbeat");
193             Objects.requireNonNull(heartbeatMethod);
194             heartbeatMethod.setAccessible(true);
195             heartbeatMethod.invoke(ruuviHandler);
196         } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
197                 | InvocationTargetException e) {
198             fail("Failed to call heartbeat method of thing handler via reflection. Bug in test? Details: "
199                     + e.getClass().getSimpleName() + ": " + e.getMessage());
200             throw new RuntimeException(e);
201         }
202     }
203
204     private String itemName(ChannelUID channelUID) {
205         return channelUID.getAsString().replace(":", "_");
206     }
207
208     private String linkChannelToAutogeneratedItem(ChannelUID channelUID) {
209         String itemName = itemName(channelUID);
210         String itemType = CHANNEL_TO_ITEM_TYPE.get(channelUID.getId());
211         GenericItem item = new CoreItemFactory().createItem(itemType, itemName);
212         assertNotNull(item, itemType);
213         itemProvider.add(item);
214         itemChannelLinkProvider.add(new ItemChannelLink(itemName, channelUID));
215         return itemName;
216     }
217
218     @Override
219     @BeforeEach
220     public void beforeEach() throws Exception {
221         super.beforeEach();
222
223         statusSubscriber.statusUpdates.clear();
224         registerService(statusSubscriber);
225
226         MQTTTopicDiscoveryService mqttTopicDiscoveryService = getService(MQTTTopicDiscoveryService.class);
227         assertNotNull(mqttTopicDiscoveryService);
228         ruuviDiscoveryService = new RuuviGatewayDiscoveryService(mqttTopicDiscoveryService);
229
230         createMqttBrokerBridge();
231
232         mqttConnection = createBrokerConnection("myclientid");
233
234         // If the connection state changes in between -> fail
235         mqttConnection.addConnectionObserver(failIfChange);
236
237         List<CompletableFuture<Boolean>> futures = new ArrayList<>();
238         futures.add(publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00", "{}"));
239
240         registeredTopics = futures.size();
241         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.SECONDS);
242
243         scheduler = new ScheduledThreadPoolExecutor(6);
244     }
245
246     @Override
247     @AfterEach
248     public void afterEach() throws Exception {
249         if (mqttConnection != null) {
250             mqttConnection.removeConnectionObserver(failIfChange);
251             mqttConnection.stop().get(5, TimeUnit.SECONDS);
252         }
253         things.stream().map(thing -> thingProvider.remove(thing.getUID()));
254         unregisterService(statusSubscriber);
255
256         if (scheduler != null) {
257             scheduler.shutdownNow();
258         }
259         super.afterEach();
260     }
261
262     @Test
263     public void retrieveAllRuuviPrefixedTopics() throws Exception {
264         CountDownLatch c = new CountDownLatch(registeredTopics);
265         mqttConnection.subscribe(BASE_TOPIC_RUUVI + "/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS);
266         assertTrue(c.await(5, TimeUnit.SECONDS),
267                 "Connection " + mqttConnection.getClientId() + " not retrieving all topics ");
268     }
269
270     private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status,
271             @Nullable ThingStatusDetail detail, @Nullable String description) {
272         assertTrue(statusUpdates.size() > index,
273                 String.format("Not enough status updates. Expected %d, but only had %d. Status updates received: %s",
274                         index + 1, statusUpdates.size(),
275                         statusUpdates.stream().map(ThingStatusInfo::getStatus).collect(Collectors.toList())));
276         assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
277         assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
278         assertEquals(description, statusUpdates.get(index).getDescription(), statusUpdates.get(index).toString());
279     }
280
281     @SuppressWarnings("null")
282     private void assertThingStatusWithDescriptionPattern(List<ThingStatusInfo> statusUpdates, int index,
283             ThingStatus status, ThingStatusDetail detail, String descriptionPattern) {
284         assertTrue(statusUpdates.size() > index, "assert " + statusUpdates.size() + " > " + index + " failed");
285         assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
286         assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
287         assertTrue(statusUpdates.get(index).getDescription().matches(descriptionPattern),
288                 statusUpdates.get(index).toString());
289     }
290
291     private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status) {
292         assertThingStatus(statusUpdates, index, status, ThingStatusDetail.NONE, null);
293     }
294
295     private void assertItems(Function<String, State> channelStateGetter, String temperatureCelsius,
296             String accelerationXStandardGravity, String accelerationYStandardGravity,
297             String accelerationZStandardGravity, String batteryVolt, int dataFormat, String humidityPercent,
298             int measurementSequenceNumber, int movementCounter, String pressurePascal, String txPowerDecibelMilliwatts,
299             String rssiDecibelMilliwatts, Instant ts, Instant gwts, String gwMac) {
300         assertEquals(new QuantityType<Temperature>(new BigDecimal(temperatureCelsius), SIUnits.CELSIUS),
301                 channelStateGetter.apply(CHANNEL_ID_TEMPERATURE));
302         assertEquals(
303                 new QuantityType<Acceleration>(new BigDecimal(accelerationXStandardGravity), Units.STANDARD_GRAVITY),
304                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONX));
305         assertEquals(
306                 new QuantityType<Acceleration>(new BigDecimal(accelerationYStandardGravity), Units.STANDARD_GRAVITY),
307                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONY));
308         assertEquals(
309                 new QuantityType<Acceleration>(new BigDecimal(accelerationZStandardGravity), Units.STANDARD_GRAVITY),
310                 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONZ));
311         assertEquals(new QuantityType<ElectricPotential>(new BigDecimal(batteryVolt), Units.VOLT),
312                 channelStateGetter.apply(CHANNEL_ID_BATTERY));
313         assertEquals(new DecimalType(dataFormat), channelStateGetter.apply(CHANNEL_ID_DATA_FORMAT));
314         assertEquals(new QuantityType<Dimensionless>(new BigDecimal(humidityPercent), Units.PERCENT),
315                 channelStateGetter.apply(CHANNEL_ID_HUMIDITY));
316         assertEquals(new DecimalType(new BigDecimal(measurementSequenceNumber)),
317                 channelStateGetter.apply(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER));
318         assertEquals(new DecimalType(new BigDecimal(movementCounter)),
319                 channelStateGetter.apply(CHANNEL_ID_MOVEMENT_COUNTER));
320         assertEquals(new QuantityType<Pressure>(new BigDecimal(pressurePascal), SIUnits.PASCAL),
321                 channelStateGetter.apply(CHANNEL_ID_PRESSURE));
322         assertEquals(new QuantityType<Power>(new BigDecimal(txPowerDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
323                 channelStateGetter.apply(CHANNEL_ID_TX_POWER));
324
325         assertEquals(new QuantityType<Power>(new BigDecimal(rssiDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
326                 channelStateGetter.apply(CHANNEL_ID_RSSI));
327         assertEquals(new DateTimeType(ts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_TS));
328         assertEquals(new DateTimeType(gwts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_GWTS));
329         assertEquals(new StringType(gwMac), channelStateGetter.apply(CHANNEL_ID_GWMAC));
330     }
331
332     @ParameterizedTest
333     @CsvSource(delimiter = '@', value = { //
334             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:AA:01 @" + "{}", // empty json
335             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:03 @" + "invalid json", // invalid json
336             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:04 @" + "0201061BFF990405", // payload too short
337             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:05 @"
338                     + "0201061BFF99050512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // wrong manufacturer id (the
339                                                                                         // two bytes after FF do not
340                                                                                         // match 99 04)
341             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:06 @"
342                     + "0201061BFA99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // unexpected advertisement (no
343                                                                                         // FF to indicate 'manufacturer
344                                                                                         // specific' advertisement)
345             BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:07 @" + "{" + "  \"gw_mac\": \"DE:AD:BE:EF:00\","
346                     + "  \"rssi\": -82," + "  \"aoa\": [],"
347                     // data field is number, not a string
348                     + "  \"gwts\": \"1659365432\"," + "  \"ts\": \"1659365222\"," + "  \"data\": 999,"
349                     + "  \"coords\": \"\" }", // wrong json data types
350     })
351     public void testInvalidCases(String topic, String val) throws Exception {
352         final String jsonPayload;
353         if (val.contains("{")) {
354             // test argument is specifiying the whole json payload
355             jsonPayload = val;
356         } else {
357             // test argument is only specifiying the data field in the json payload
358             // Fill rest of the fields with some valid values
359             jsonPayload = "{" + "  \"gw_mac\": \"DE:AD:BE:EF:00\"," + "  \"rssi\": -82," + "  \"aoa\": [],"
360                     + "  \"gwts\": \"1659365432\"," + "  \"ts\": \"1659365222\"," + "  \"data\": \"" + val + "\","
361                     + "  \"coords\": \"\" }";
362         }
363
364         Thing ruuviThing = createRuuviThing("mygwid", topic, 100);
365         waitForAssert(() -> {
366             List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
367             assertNotNull(statusUpdates);
368             int statusUpdateIndex = 0;
369             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.INITIALIZING);
370             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.UNKNOWN);
371             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.ONLINE, ThingStatusDetail.NONE,
372                     "Waiting for initial data");
373             assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
374                     ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
375             scheduler.execute(() -> publish(topic, jsonPayload));
376             assertThingStatusWithDescriptionPattern(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
377                     ThingStatusDetail.COMMUNICATION_ERROR, ".*could not be parsed.*");
378             assertEquals(statusUpdateIndex, statusUpdates.size());
379         });
380     }
381
382     @SuppressWarnings("null")
383     @Test
384     public void testDiscovery() {
385         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
386                 "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -82," + " \"aoa\": [],"
387                         + " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\","
388                         + " \"data\": \"0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F\","
389                         + " \"coords\": \"\" }"));
390         waitForAssert(() -> {
391             assertEquals(2, inbox.getAll().size(), inbox.getAll().toString());
392             var discovered = new HashSet<DiscoveryResult>();
393             discovered.addAll(inbox.getAll());
394
395             for (var result : discovered) {
396                 assertEquals(THING_TYPE_BEACON, result.getThingTypeUID());
397                 assertEquals("topic", result.getRepresentationProperty());
398                 Object topic = result.getProperties().get("topic");
399                 assertNotNull(topic);
400                 assertTrue(
401                         // published in this test
402                         topic.equals((BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02"))
403                                 // published in beforeEach
404                                 || result.getProperties().get("topic")
405                                         .equals((BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00")));
406             }
407         });
408     }
409
410     @ParameterizedTest
411     @ValueSource(booleans = { true, false })
412     public void testHappyFlow(boolean quickTimeout) {
413         // with quickTimeout=false, heartbeat is effectively disabled. Thing will not "timeout" and go OFFLINE
414         // with quickTimeout=true, timeout happens very fast. In CI we use infinite timeout and trigger timeout manually
415
416         Thing ruuviThing = createRuuviThing("mygwid", BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
417                 quickTimeout ? (isRunningInCI() ? 9_000_000 : 100) : 9_000_000);
418         // Link all channels to freshly created items
419         ruuviThing.getChannels().stream().map(Channel::getUID).forEach(this::linkChannelToAutogeneratedItem);
420
421         @SuppressWarnings("null")
422         Function<String, State> getItemState = channelId -> itemRegistry
423                 .get(itemName(ruuviThing.getChannel(channelId).getUID())).getState();
424
425         AtomicInteger statusUpdateIndex = new AtomicInteger();
426         waitForAssert(() -> {
427             List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
428             assertNotNull(statusUpdates);
429
430             assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.INITIALIZING);
431             assertThingStatus(statusUpdates, statusUpdateIndex.get() + 1, ThingStatus.UNKNOWN);
432             assertThingStatus(statusUpdates, statusUpdateIndex.get() + 2, ThingStatus.ONLINE, ThingStatusDetail.NONE,
433                     "Waiting for initial data");
434
435             statusUpdateIndex.set(statusUpdateIndex.get() + 3);
436         });
437
438         List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
439         assertNotNull(statusUpdates);
440         if (quickTimeout) {
441             if (isRunningInCI()) {
442                 triggerTimeoutHandling(ruuviThing);
443             }
444             waitForAssert(() -> {
445                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
446                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
447
448                 CHANNEL_TO_ITEM_TYPE.keySet()
449                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
450                 statusUpdateIndex.incrementAndGet();
451             });
452         }
453
454         // publish some valid data ("valid case" test vector from
455         // https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
456         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
457                 "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -82," + " \"aoa\": [],"
458                         + " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\","
459                         + " \"data\": \"0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F\","
460                         + " \"coords\": \"\" }"));
461
462         waitForAssert(() -> {
463             assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
464             statusUpdateIndex.incrementAndGet();
465         });
466
467         waitForAssert(() -> {
468             assertItems(getItemState, //
469                     "24.3", // temperature, Celsius
470                     "0.004", // acc X, g
471                     "-0.004", // acc Y, g
472                     "1.036", // acc Z, g
473                     "2.9770000000000003", // battery, volt
474                     5, // data format
475                     "53.49", // humidity %
476                     205, // measurement seq
477                     66, // movement
478                     "100044", // pressure, pascal
479                     "4", // tx power, dBm
480                     "-82", // RSSI, dBm
481                     Instant.ofEpochSecond(1659365222), // ts
482                     Instant.ofEpochSecond(1659365432), // gwts
483                     "DE:AD:BE:EF:00" // gw mac
484             );
485         });
486
487         if (quickTimeout) {
488             if (isRunningInCI()) {
489                 triggerTimeoutHandling(ruuviThing);
490             }
491             waitForAssert(() -> {
492                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
493                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
494                 CHANNEL_TO_ITEM_TYPE.keySet()
495                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
496                 statusUpdateIndex.incrementAndGet();
497             });
498
499         }
500
501         // Another mqtt update (("minimum values" test vector from
502         // https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
503         scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
504                 "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -66," + " \"aoa\": [],"
505                         + " \"gwts\": \"1659365431\"," + " \"ts\": \"1659365221\","
506                         + " \"data\": \"0201061BFF9904058001000000008001800180010000000000CBB8334C884F\","
507                         + " \"coords\": \"\" }"));
508         if (quickTimeout) {
509             // With quick timeout we were previously offline, so now we should be back online
510             // with valid channels.
511             waitForAssert(() -> {
512                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
513                 statusUpdateIndex.getAndIncrement();
514             });
515
516             // ...after a while all items are updated
517             waitForAssert(() -> {
518                 assertItems(getItemState, //
519                         "-163.835", // temperature, Celsius
520                         "-32.767", // acc X, g
521                         "-32.767", // acc Y, g
522                         "-32.767", // acc Z, g
523                         "1.6", // battery, volt
524                         5, // data format
525                         "0.0", // humidity %
526                         0, // measurement seq
527                         0, // movement
528                         "50000", // pressure, pascal
529                         "-40", // tx power, dBm
530                         "-66", // RSSI, dBm
531                         Instant.ofEpochSecond(1659365221), // ts
532                         Instant.ofEpochSecond(1659365431), // gwts
533                         "DE:AD:BE:EF:00" // gw mac
534                 );
535             });
536
537             // ...after which timeout will happen again
538             if (isRunningInCI()) {
539                 triggerTimeoutHandling(ruuviThing);
540             }
541             waitForAssert(() -> {
542                 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
543                         ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
544                 CHANNEL_TO_ITEM_TYPE.keySet()
545                         .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
546                 statusUpdateIndex.getAndIncrement();
547             });
548         } else {
549             // with non-quick timeout we are still online, and items are updated
550             waitForAssert(() -> {
551                 assertItems(getItemState, //
552                         "-163.835", // temperature, Celsius
553                         "-32.767", // acc X, g
554                         "-32.767", // acc Y, g
555                         "-32.767", // acc Z, g
556                         "1.6", // battery, volt
557                         5, // data format
558                         "0.0", // humidity %
559                         0, // measurement seq
560                         0, // movement
561                         "50000", // pressure, pascal
562                         "-40", // tx power, dBm
563                         "-66", // RSSI, dBm
564                         Instant.ofEpochSecond(1659365221), // ts
565                         Instant.ofEpochSecond(1659365431), // gwts
566                         "DE:AD:BE:EF:00" // gw mac
567                 );
568             });
569         }
570
571         // assert that we have processed all status updates
572         assertEquals(statusUpdateIndex.get(), statusUpdates.size());
573     }
574 }