2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.ruuvigateway;
15 import static org.hamcrest.CoreMatchers.is;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.*;
18 import static org.mockito.Mockito.*;
19 import static org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants.*;
20 import static org.openhab.core.library.unit.MetricPrefix.HECTO;
22 import java.lang.reflect.InvocationTargetException;
23 import java.lang.reflect.Method;
24 import java.math.BigDecimal;
25 import java.time.Instant;
26 import java.time.ZoneId;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.List;
32 import java.util.Objects;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.function.Function;
41 import java.util.stream.Collectors;
43 import javax.measure.quantity.Acceleration;
44 import javax.measure.quantity.Dimensionless;
45 import javax.measure.quantity.ElectricPotential;
46 import javax.measure.quantity.Power;
47 import javax.measure.quantity.Pressure;
48 import javax.measure.quantity.Temperature;
50 import org.eclipse.jdt.annotation.NonNullByDefault;
51 import org.eclipse.jdt.annotation.Nullable;
52 import org.junit.jupiter.api.AfterEach;
53 import org.junit.jupiter.api.BeforeEach;
54 import org.junit.jupiter.api.Test;
55 import org.junit.jupiter.api.extension.ExtendWith;
56 import org.junit.jupiter.params.ParameterizedTest;
57 import org.junit.jupiter.params.provider.CsvSource;
58 import org.junit.jupiter.params.provider.ValueSource;
59 import org.mockito.Mock;
60 import org.mockito.junit.jupiter.MockitoExtension;
61 import org.mockito.junit.jupiter.MockitoSettings;
62 import org.mockito.quality.Strictness;
63 import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryService;
64 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants;
65 import org.openhab.binding.mqtt.ruuvigateway.internal.discovery.RuuviGatewayDiscoveryService;
66 import org.openhab.binding.mqtt.ruuvigateway.internal.handler.RuuviTagHandler;
67 import org.openhab.core.config.core.Configuration;
68 import org.openhab.core.config.discovery.DiscoveryResult;
69 import org.openhab.core.i18n.UnitProvider;
70 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
71 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
72 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
73 import org.openhab.core.items.GenericItem;
74 import org.openhab.core.library.CoreItemFactory;
75 import org.openhab.core.library.types.DateTimeType;
76 import org.openhab.core.library.types.DecimalType;
77 import org.openhab.core.library.types.QuantityType;
78 import org.openhab.core.library.types.StringType;
79 import org.openhab.core.library.unit.SIUnits;
80 import org.openhab.core.library.unit.Units;
81 import org.openhab.core.thing.Bridge;
82 import org.openhab.core.thing.Channel;
83 import org.openhab.core.thing.ChannelUID;
84 import org.openhab.core.thing.Thing;
85 import org.openhab.core.thing.ThingStatus;
86 import org.openhab.core.thing.ThingStatusDetail;
87 import org.openhab.core.thing.ThingStatusInfo;
88 import org.openhab.core.thing.ThingTypeUID;
89 import org.openhab.core.thing.ThingUID;
90 import org.openhab.core.thing.binding.ThingHandler;
91 import org.openhab.core.thing.binding.builder.BridgeBuilder;
92 import org.openhab.core.thing.binding.builder.ChannelBuilder;
93 import org.openhab.core.thing.binding.builder.ThingBuilder;
94 import org.openhab.core.thing.link.ItemChannelLink;
95 import org.openhab.core.types.State;
96 import org.openhab.core.types.UnDefType;
99 * A full implementation test, that starts the embedded MQTT broker and publishes test data
101 * @author David Graeff - Initial contribution
102 * @author Sami Salonen - Adapted and extended to Ruuvi Gateway tests
105 @ExtendWith(MockitoExtension.class)
106 @MockitoSettings(strictness = Strictness.LENIENT)
107 public class RuuviGatewayTest extends MqttOSGiTest {
108 protected @Mock @NonNullByDefault({}) UnitProvider mockedUnitProvider;
109 private static final String BASE_TOPIC_RUUVI = "ruuvi";
110 private static final Map<String, String> CHANNEL_TO_ITEM_TYPE = new HashMap<>();
112 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONX, "Number:Acceleration");
113 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONY, "Number:Acceleration");
114 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONZ, "Number:Acceleration");
115 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_BATTERY, "Number:ElectricPotential");
116 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_DATA_FORMAT, "Number");
117 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_HUMIDITY, "Number:Dimensionless");
118 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER, "Number");
119 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MOVEMENT_COUNTER, "Number");
120 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_PRESSURE, "Number:Pressure");
121 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TEMPERATURE, "Number:Temperature");
122 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TX_POWER, "Number:Power");
123 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_RSSI, "Number:Power");
124 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TS, "DateTime");
125 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWTS, "DateTime");
126 CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWMAC, "String");
129 private ThingStatusInfoChangedSubscriber statusSubscriber = new ThingStatusInfoChangedSubscriber();
130 private @NonNullByDefault({}) MqttBrokerConnection mqttConnection;
131 private int registeredTopics = 100;
133 private @NonNullByDefault({}) ScheduledExecutorService scheduler;
136 * Create an observer that fails the test as soon as the broker client connection changes its connection state
137 * to something else then CONNECTED.
139 private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
140 is(MqttConnectionState.CONNECTED));
142 @SuppressWarnings("unused") // used indirectly with Inbox
143 private @NonNullByDefault({}) RuuviGatewayDiscoveryService ruuviDiscoveryService;
144 private Set<Thing> things = new HashSet<>();
147 public void setup() {
148 when(mockedUnitProvider.getUnit(any())).then(i -> {
149 Class clazz = i.getArgument(0);
150 if (Temperature.class.equals(clazz)) {
151 return SIUnits.CELSIUS;
152 } else if (Acceleration.class.equals(clazz)) {
153 return Units.METRE_PER_SQUARE_SECOND;
154 } else if (Dimensionless.class.equals(clazz)) {
156 } else if (ElectricPotential.class.equals(clazz)) {
158 } else if (Pressure.class.equals(clazz)) {
159 return HECTO(SIUnits.PASCAL);
160 } else if (Power.class.equals(clazz)) {
167 private Bridge createMqttBrokerBridge() {
168 Configuration configuration = new Configuration();
169 configuration.put("host", "127.0.0.1");
170 configuration.put("port", brokerConnection.getPort());
171 Bridge bridge = BridgeBuilder.create(new ThingTypeUID("mqtt", "broker"), "mybroker").withLabel("MQTT Broker")
172 .withConfiguration(configuration).build();
173 thingProvider.add(bridge);
174 waitForAssert(() -> assertNotNull(bridge.getHandler()));
175 assertNotNull(bridge.getConfiguration());
180 private Thing createRuuviThing(String brokerPrefix, String topic, @Nullable Integer timeoutMillisecs) {
181 Configuration configuration = new Configuration();
182 configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TOPIC, topic);
183 if (timeoutMillisecs != null) {
184 configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TIMEOUT, timeoutMillisecs);
186 ThingUID bridgeThingUID = new ThingUID("mqtt", "broker", "mybroker");
187 ThingUID thingUID = new ThingUID(RuuviGatewayBindingConstants.THING_TYPE_BEACON,
188 topic.replaceAll("[:_/]", "_"));
189 ThingBuilder thingBuilder = ThingBuilder.create(RuuviGatewayBindingConstants.THING_TYPE_BEACON, thingUID)
190 .withBridge(bridgeThingUID).withLabel("Ruuvi " + topic).withConfiguration(configuration);
192 CHANNEL_TO_ITEM_TYPE.forEach((channelId, _itemType) -> {
193 thingBuilder.withChannel(ChannelBuilder.create(new ChannelUID(thingUID, channelId)).build());
196 Thing thing = thingBuilder.build();
197 thingProvider.add(thing);
198 waitForAssert(() -> assertNotNull(thing.getHandler()));
199 assertNotNull(thing.getConfiguration());
204 private void triggerTimeoutHandling(Thing ruuviThing) {
205 // Simulate some time passing, so that RuuviTagHandler.heartbeat() is called twice
206 // Two heartbeat calls happens to trigger timeout handling in handler, one is not enough.
207 // (this is really implementation detail of RuuviTagHandler, making this test slightly
208 // error prone to possible changes in RuuviTagHandler implementation)
210 // 0. Assume some data received already, RuuviTagHandler.receivedData is true
211 // 1. First heartbeat sets receivedData=false; no further action is taken yet
212 // 2. Second heartbeat acts on false receivedData, e.g. updating Thing Status
213 for (int i = 0; i < 2; i++) {
214 callInternalHeartbeat(ruuviThing);
218 private void callInternalHeartbeat(Thing ruuviThing) {
219 ThingHandler handler = ruuviThing.getHandler();
220 Objects.requireNonNull(handler);
221 assertInstanceOf(RuuviTagHandler.class, handler);
222 RuuviTagHandler ruuviHandler = (RuuviTagHandler) handler;
224 Method heartbeatMethod = RuuviTagHandler.class.getDeclaredMethod("heartbeat");
225 Objects.requireNonNull(heartbeatMethod);
226 heartbeatMethod.setAccessible(true);
227 heartbeatMethod.invoke(ruuviHandler);
228 } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
229 | InvocationTargetException e) {
230 fail("Failed to call heartbeat method of thing handler via reflection. Bug in test? Details: "
231 + e.getClass().getSimpleName() + ": " + e.getMessage());
232 throw new RuntimeException(e);
236 private String itemName(ChannelUID channelUID) {
237 return channelUID.getAsString().replace(":", "_");
240 private String linkChannelToAutogeneratedItem(ChannelUID channelUID) {
241 String itemName = itemName(channelUID);
242 String itemType = CHANNEL_TO_ITEM_TYPE.get(channelUID.getId());
243 GenericItem item = new CoreItemFactory(mockedUnitProvider).createItem(itemType, itemName);
244 assertNotNull(item, itemType);
245 itemProvider.add(item);
246 itemChannelLinkProvider.add(new ItemChannelLink(itemName, channelUID));
252 public void beforeEach() throws Exception {
255 statusSubscriber.statusUpdates.clear();
256 registerService(statusSubscriber);
258 MQTTTopicDiscoveryService mqttTopicDiscoveryService = getService(MQTTTopicDiscoveryService.class);
259 assertNotNull(mqttTopicDiscoveryService);
260 ruuviDiscoveryService = new RuuviGatewayDiscoveryService(mqttTopicDiscoveryService);
262 createMqttBrokerBridge();
264 mqttConnection = createBrokerConnection("myclientid");
266 // If the connection state changes in between -> fail
267 mqttConnection.addConnectionObserver(failIfChange);
269 List<CompletableFuture<Boolean>> futures = new ArrayList<>();
270 futures.add(publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00", "{}"));
272 registeredTopics = futures.size();
273 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.SECONDS);
275 scheduler = new ScheduledThreadPoolExecutor(6);
280 public void afterEach() throws Exception {
281 if (mqttConnection != null) {
282 mqttConnection.removeConnectionObserver(failIfChange);
283 mqttConnection.stop().get(5, TimeUnit.SECONDS);
285 things.stream().map(thing -> thingProvider.remove(thing.getUID()));
286 unregisterService(statusSubscriber);
288 if (scheduler != null) {
289 scheduler.shutdownNow();
295 public void retrieveAllRuuviPrefixedTopics() throws Exception {
296 CountDownLatch c = new CountDownLatch(registeredTopics);
297 mqttConnection.subscribe(BASE_TOPIC_RUUVI + "/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS);
298 assertTrue(c.await(5, TimeUnit.SECONDS),
299 "Connection " + mqttConnection.getClientId() + " not retrieving all topics ");
302 private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status,
303 @Nullable ThingStatusDetail detail, @Nullable String description) {
304 assertTrue(statusUpdates.size() > index,
305 String.format("Not enough status updates. Expected %d, but only had %d. Status updates received: %s",
306 index + 1, statusUpdates.size(),
307 statusUpdates.stream().map(ThingStatusInfo::getStatus).collect(Collectors.toList())));
308 assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
309 assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
310 assertEquals(description, statusUpdates.get(index).getDescription(), statusUpdates.get(index).toString());
313 @SuppressWarnings("null")
314 private void assertThingStatusWithDescriptionPattern(List<ThingStatusInfo> statusUpdates, int index,
315 ThingStatus status, ThingStatusDetail detail, String descriptionPattern) {
316 assertTrue(statusUpdates.size() > index, "assert " + statusUpdates.size() + " > " + index + " failed");
317 assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
318 assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
319 assertTrue(statusUpdates.get(index).getDescription().matches(descriptionPattern),
320 statusUpdates.get(index).toString());
323 private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status) {
324 assertThingStatus(statusUpdates, index, status, ThingStatusDetail.NONE, null);
327 private void assertItems(Function<String, State> channelStateGetter, String temperatureCelsius,
328 String accelerationXStandardGravity, String accelerationYStandardGravity,
329 String accelerationZStandardGravity, String batteryVolt, int dataFormat, String humidityPercent,
330 int measurementSequenceNumber, int movementCounter, String pressurePascal, String txPowerDecibelMilliwatts,
331 String rssiDecibelMilliwatts, Instant ts, Instant gwts, String gwMac) {
332 assertEquals(new QuantityType<Temperature>(new BigDecimal(temperatureCelsius), SIUnits.CELSIUS),
333 channelStateGetter.apply(CHANNEL_ID_TEMPERATURE));
335 new QuantityType<Acceleration>(new BigDecimal(accelerationXStandardGravity), Units.STANDARD_GRAVITY),
336 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONX));
338 new QuantityType<Acceleration>(new BigDecimal(accelerationYStandardGravity), Units.STANDARD_GRAVITY),
339 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONY));
341 new QuantityType<Acceleration>(new BigDecimal(accelerationZStandardGravity), Units.STANDARD_GRAVITY),
342 channelStateGetter.apply(CHANNEL_ID_ACCELERATIONZ));
343 assertEquals(new QuantityType<ElectricPotential>(new BigDecimal(batteryVolt), Units.VOLT),
344 channelStateGetter.apply(CHANNEL_ID_BATTERY));
345 assertEquals(new DecimalType(dataFormat), channelStateGetter.apply(CHANNEL_ID_DATA_FORMAT));
346 assertEquals(new QuantityType<Dimensionless>(new BigDecimal(humidityPercent), Units.PERCENT),
347 channelStateGetter.apply(CHANNEL_ID_HUMIDITY));
348 assertEquals(new DecimalType(new BigDecimal(measurementSequenceNumber)),
349 channelStateGetter.apply(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER));
350 assertEquals(new DecimalType(new BigDecimal(movementCounter)),
351 channelStateGetter.apply(CHANNEL_ID_MOVEMENT_COUNTER));
352 assertEquals(new QuantityType<Pressure>(new BigDecimal(pressurePascal), SIUnits.PASCAL),
353 channelStateGetter.apply(CHANNEL_ID_PRESSURE));
354 assertEquals(new QuantityType<Power>(new BigDecimal(txPowerDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
355 channelStateGetter.apply(CHANNEL_ID_TX_POWER));
357 assertEquals(new QuantityType<Power>(new BigDecimal(rssiDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
358 channelStateGetter.apply(CHANNEL_ID_RSSI));
359 assertEquals(new DateTimeType(ts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_TS));
360 assertEquals(new DateTimeType(gwts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_GWTS));
361 assertEquals(new StringType(gwMac), channelStateGetter.apply(CHANNEL_ID_GWMAC));
365 @CsvSource(delimiter = '@', value = { //
366 BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:AA:01 @" + "{}", // empty json
367 BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:03 @" + "invalid json", // invalid json
368 BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:04 @" + "0201061BFF990405", // payload too short
369 BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:05 @"
370 + "0201061BFF99050512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // wrong manufacturer id (the
371 // two bytes after FF do not
373 BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:06 @"
374 + "0201061BFA99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // unexpected advertisement (no
375 // FF to indicate 'manufacturer
376 // specific' advertisement)
377 BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:07 @" + "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\","
378 + " \"rssi\": -82," + " \"aoa\": [],"
379 // data field is number, not a string
380 + " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\"," + " \"data\": 999,"
381 + " \"coords\": \"\" }", // wrong json data types
383 public void testInvalidCases(String topic, String val) throws Exception {
384 final String jsonPayload;
385 if (val.contains("{")) {
386 // test argument is specifiying the whole json payload
389 // test argument is only specifiying the data field in the json payload
390 // Fill rest of the fields with some valid values
391 jsonPayload = "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -82," + " \"aoa\": [],"
392 + " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\"," + " \"data\": \"" + val + "\","
393 + " \"coords\": \"\" }";
396 Thing ruuviThing = createRuuviThing("mygwid", topic, 100);
397 waitForAssert(() -> {
398 List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
399 assertNotNull(statusUpdates);
400 int statusUpdateIndex = 0;
401 assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.INITIALIZING);
402 assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.UNKNOWN);
403 assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.ONLINE, ThingStatusDetail.NONE,
404 "Waiting for initial data");
405 assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
406 ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
407 scheduler.execute(() -> publish(topic, jsonPayload));
408 assertThingStatusWithDescriptionPattern(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
409 ThingStatusDetail.COMMUNICATION_ERROR, ".*could not be parsed.*");
410 assertEquals(statusUpdateIndex, statusUpdates.size());
414 @SuppressWarnings("null")
416 public void testDiscovery() {
417 scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
418 "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -82," + " \"aoa\": [],"
419 + " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\","
420 + " \"data\": \"0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F\","
421 + " \"coords\": \"\" }"));
422 waitForAssert(() -> {
423 assertEquals(2, inbox.getAll().size(), inbox.getAll().toString());
424 var discovered = new HashSet<DiscoveryResult>();
425 discovered.addAll(inbox.getAll());
427 for (var result : discovered) {
428 assertEquals(THING_TYPE_BEACON, result.getThingTypeUID());
429 assertEquals("topic", result.getRepresentationProperty());
430 Object topic = result.getProperties().get("topic");
431 assertNotNull(topic);
433 // published in this test
434 topic.equals((BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02"))
435 // published in beforeEach
436 || result.getProperties().get("topic")
437 .equals((BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00")));
443 @ValueSource(booleans = { true, false })
444 public void testHappyFlow(boolean quickTimeout) {
445 // with quickTimeout=false, heartbeat is effectively disabled. Thing will not "timeout" and go OFFLINE
446 // with quickTimeout=true, timeout happens very fast. In CI we use infinite timeout and trigger timeout manually
448 Thing ruuviThing = createRuuviThing("mygwid", BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
449 quickTimeout ? (isRunningInCI() ? 9_000_000 : 100) : 9_000_000);
450 // Link all channels to freshly created items
451 ruuviThing.getChannels().stream().map(Channel::getUID).forEach(this::linkChannelToAutogeneratedItem);
453 @SuppressWarnings("null")
454 Function<String, State> getItemState = channelId -> itemRegistry
455 .get(itemName(ruuviThing.getChannel(channelId).getUID())).getState();
457 AtomicInteger statusUpdateIndex = new AtomicInteger();
458 waitForAssert(() -> {
459 List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
460 assertNotNull(statusUpdates);
462 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.INITIALIZING);
463 assertThingStatus(statusUpdates, statusUpdateIndex.get() + 1, ThingStatus.UNKNOWN);
464 assertThingStatus(statusUpdates, statusUpdateIndex.get() + 2, ThingStatus.ONLINE, ThingStatusDetail.NONE,
465 "Waiting for initial data");
467 statusUpdateIndex.set(statusUpdateIndex.get() + 3);
470 List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
471 assertNotNull(statusUpdates);
473 if (isRunningInCI()) {
474 triggerTimeoutHandling(ruuviThing);
476 waitForAssert(() -> {
477 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
478 ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
480 CHANNEL_TO_ITEM_TYPE.keySet()
481 .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
482 statusUpdateIndex.incrementAndGet();
486 // publish some valid data ("valid case" test vector from
487 // https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
488 scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
489 "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -82," + " \"aoa\": [],"
490 + " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\","
491 + " \"data\": \"0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F\","
492 + " \"coords\": \"\" }"));
494 waitForAssert(() -> {
495 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
496 statusUpdateIndex.incrementAndGet();
499 waitForAssert(() -> {
500 assertItems(getItemState, //
501 "24.3", // temperature, Celsius
503 "-0.004", // acc Y, g
505 "2.9770000000000003", // battery, volt
507 "53.49", // humidity %
508 205, // measurement seq
510 "100044", // pressure, pascal
511 "4", // tx power, dBm
513 Instant.ofEpochSecond(1659365222), // ts
514 Instant.ofEpochSecond(1659365432), // gwts
515 "DE:AD:BE:EF:00" // gw mac
520 if (isRunningInCI()) {
521 triggerTimeoutHandling(ruuviThing);
523 waitForAssert(() -> {
524 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
525 ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
526 CHANNEL_TO_ITEM_TYPE.keySet()
527 .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
528 statusUpdateIndex.incrementAndGet();
533 // Another mqtt update (("minimum values" test vector from
534 // https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
535 scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
536 "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -66," + " \"aoa\": [],"
537 + " \"gwts\": \"1659365431\"," + " \"ts\": \"1659365221\","
538 + " \"data\": \"0201061BFF9904058001000000008001800180010000000000CBB8334C884F\","
539 + " \"coords\": \"\" }"));
541 // With quick timeout we were previously offline, so now we should be back online
542 // with valid channels.
543 waitForAssert(() -> {
544 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
545 statusUpdateIndex.getAndIncrement();
548 // ...after a while all items are updated
549 waitForAssert(() -> {
550 assertItems(getItemState, //
551 "-163.835", // temperature, Celsius
552 "-32.767", // acc X, g
553 "-32.767", // acc Y, g
554 "-32.767", // acc Z, g
555 "1.6", // battery, volt
558 0, // measurement seq
560 "50000", // pressure, pascal
561 "-40", // tx power, dBm
563 Instant.ofEpochSecond(1659365221), // ts
564 Instant.ofEpochSecond(1659365431), // gwts
565 "DE:AD:BE:EF:00" // gw mac
569 // ...after which timeout will happen again
570 if (isRunningInCI()) {
571 triggerTimeoutHandling(ruuviThing);
573 waitForAssert(() -> {
574 assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
575 ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
576 CHANNEL_TO_ITEM_TYPE.keySet()
577 .forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
578 statusUpdateIndex.getAndIncrement();
581 // with non-quick timeout we are still online, and items are updated
582 waitForAssert(() -> {
583 assertItems(getItemState, //
584 "-163.835", // temperature, Celsius
585 "-32.767", // acc X, g
586 "-32.767", // acc Y, g
587 "-32.767", // acc Z, g
588 "1.6", // battery, volt
591 0, // measurement seq
593 "50000", // pressure, pascal
594 "-40", // tx power, dBm
596 Instant.ofEpochSecond(1659365221), // ts
597 Instant.ofEpochSecond(1659365431), // gwts
598 "DE:AD:BE:EF:00" // gw mac
603 // assert that we have processed all status updates
604 assertEquals(statusUpdateIndex.get(), statusUpdates.size());