2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.ruuvigateway.internal.handler;
15 import static org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants.*;
17 import java.lang.reflect.InvocationTargetException;
18 import java.nio.charset.StandardCharsets;
19 import java.time.Instant;
20 import java.util.HashMap;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.stream.Collectors;
30 import javax.measure.Quantity;
31 import javax.measure.Unit;
33 import org.eclipse.jdt.annotation.NonNullByDefault;
34 import org.eclipse.jdt.annotation.Nullable;
35 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
36 import org.openhab.binding.mqtt.generic.ChannelState;
37 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviCachedDateTimeState;
38 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviCachedNumberState;
39 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviCachedStringState;
40 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants;
41 import org.openhab.binding.mqtt.ruuvigateway.internal.parser.GatewayPayloadParser;
42 import org.openhab.binding.mqtt.ruuvigateway.internal.parser.GatewayPayloadParser.GatewayPayload;
43 import org.openhab.core.config.core.Configuration;
44 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
45 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
46 import org.openhab.core.library.unit.SIUnits;
47 import org.openhab.core.library.unit.Units;
48 import org.openhab.core.thing.Channel;
49 import org.openhab.core.thing.ChannelUID;
50 import org.openhab.core.thing.Thing;
51 import org.openhab.core.thing.ThingStatus;
52 import org.openhab.core.thing.ThingStatusDetail;
53 import org.openhab.core.types.UnDefType;
54 import org.openhab.core.util.HexUtils;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
58 import com.google.gson.JsonSyntaxException;
61 * The {@link RuuviTagHandler} is responsible updating RuuviTag Sensor data received from
62 * Ruuvi Gateway via MQTT.
64 * @author Sami Salonen - Initial contribution
67 public class RuuviTagHandler extends AbstractMQTTThingHandler implements MqttMessageSubscriber {
69 // Ruuvitag sends an update every 10 seconds. So we keep a heartbeat to give it some slack
70 private int heartbeatTimeoutMillisecs = 60_000;
71 // This map is used to initialize channel caches.
73 // Value is one of the following
74 // - null (plain number), uses RuuviCachedNumberState
75 // - Unit (QuantityType Number), uses RuuviCachedNumberState with unit
76 // - Class object, uses given class object with String constructor
78 private static final Map<String, @Nullable Object> unitByChannelUID = new HashMap<>(11);
80 unitByChannelUID.put(CHANNEL_ID_ACCELERATIONX, Units.STANDARD_GRAVITY);
81 unitByChannelUID.put(CHANNEL_ID_ACCELERATIONY, Units.STANDARD_GRAVITY);
82 unitByChannelUID.put(CHANNEL_ID_ACCELERATIONZ, Units.STANDARD_GRAVITY);
83 unitByChannelUID.put(CHANNEL_ID_BATTERY, Units.VOLT);
84 unitByChannelUID.put(CHANNEL_ID_DATA_FORMAT, null);
85 unitByChannelUID.put(CHANNEL_ID_HUMIDITY, Units.PERCENT);
86 unitByChannelUID.put(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER, Units.ONE);
87 unitByChannelUID.put(CHANNEL_ID_MOVEMENT_COUNTER, Units.ONE);
88 unitByChannelUID.put(CHANNEL_ID_PRESSURE, SIUnits.PASCAL);
89 unitByChannelUID.put(CHANNEL_ID_TEMPERATURE, SIUnits.CELSIUS);
90 unitByChannelUID.put(CHANNEL_ID_TX_POWER, Units.DECIBEL_MILLIWATTS);
91 unitByChannelUID.put(CHANNEL_ID_RSSI, Units.DECIBEL_MILLIWATTS);
92 unitByChannelUID.put(CHANNEL_ID_TS, RuuviCachedDateTimeState.class);
93 unitByChannelUID.put(CHANNEL_ID_GWTS, RuuviCachedDateTimeState.class);
94 unitByChannelUID.put(CHANNEL_ID_GWMAC, RuuviCachedStringState.class);
97 private final Logger logger = LoggerFactory.getLogger(RuuviTagHandler.class);
99 * Indicator whether we have received data recently
101 private final AtomicBoolean receivedData = new AtomicBoolean();
102 private final Map<ChannelUID, ChannelState> channelStateByChannelUID = new HashMap<>();
103 private @NonNullByDefault({}) ScheduledFuture<?> heartbeatFuture;
106 * Topic with data for this particular Ruuvi Tag. Set in initialize (when configuration is valid).
108 private @NonNullByDefault({}) String topic;
110 public RuuviTagHandler(Thing thing, int subscribeTimeout) {
111 super(thing, subscribeTimeout);
115 public void initialize() {
116 initializeChannelCaches();
117 Configuration configuration = getThing().getConfiguration();
118 String topic = (String) configuration.get(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TOPIC);
119 if (topic == null || topic.isBlank()) {
120 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
121 "@text/offline.configuration-error.missing-topic");
124 Object timeout = configuration.get(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TIMEOUT);
125 if (timeout != null) {
126 // Note: only in tests
127 heartbeatTimeoutMillisecs = Integer.parseInt(timeout.toString());
128 logger.warn("Using overridden timeout: {}", heartbeatTimeoutMillisecs);
135 private void initializeChannelCaches() {
136 for (Channel channel : thing.getChannels()) {
137 ChannelUID channelUID = channel.getUID();
138 String channelID = channelUID.getId();
139 assert unitByChannelUID.containsKey(channelID); // Invariant as all channels should exist in the static map
140 Object cacheHint = unitByChannelUID.get(channelID);
141 if (cacheHint == null || cacheHint instanceof Unit<?>) {
142 Unit<?> unit = (Unit<?>) cacheHint;
143 initNumberStateCache(channelUID, unit);
145 Class<?> cacheType = (Class<?>) cacheHint;
146 initCacheWithClass(channelUID, cacheType);
152 private <T extends Quantity<T>> RuuviCachedNumberState<?> initNumberStateCache(ChannelUID channelUID,
153 @Nullable Unit<T> unit) {
154 final RuuviCachedNumberState<?> cached;
156 cached = new RuuviCachedNumberState<>(channelUID);
157 channelStateByChannelUID.put(channelUID, cached);
159 cached = new RuuviCachedNumberState<>(channelUID, unit);
160 channelStateByChannelUID.put(channelUID, cached);
165 private ChannelState initCacheWithClass(ChannelUID channelUID, Class<?> clazz) {
167 ChannelState cached = (ChannelState) clazz.getConstructor(ChannelUID.class).newInstance(channelUID);
168 Objects.requireNonNull(cached); // to make compiler happy
169 channelStateByChannelUID.put(channelUID, cached);
171 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
172 | NoSuchMethodException | SecurityException e) {
173 throw new IllegalStateException(e);
178 protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
180 // Initialization has not been completed successfully, return early without changing
182 return CompletableFuture.completedFuture(null);
185 updateStatus(ThingStatus.UNKNOWN);
186 return connection.subscribe(topic, this).handle((subscriptionSuccess, subscriptionException) -> {
187 if (subscriptionSuccess) {
188 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE, "@text/online.waiting-initial-data");
189 heartbeatFuture = scheduler.scheduleWithFixedDelay(this::heartbeat, heartbeatTimeoutMillisecs,
190 heartbeatTimeoutMillisecs, TimeUnit.MILLISECONDS);
192 if (subscriptionException == null) {
193 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
194 "@text/offline.communication-error.mqtt-subscription-failed");
196 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
197 "@text/offline.communication-error.mqtt-subscription-failed-details [\""
198 + subscriptionException.getClass().getSimpleName() + "\", \""
199 + subscriptionException.getMessage() + "\"]");
207 public CompletableFuture<Void> unsubscribeAll() {
208 MqttBrokerConnection localConnection = connection;
209 String localTopic = topic;
210 if (localConnection != null && localTopic != null) {
211 return localConnection.unsubscribe(localTopic, this).thenCompose(unsubscribeSuccessful -> null);
213 return CompletableFuture.completedFuture(null);
218 protected void stop() {
219 ScheduledFuture<?> localHeartbeatFuture = heartbeatFuture;
220 if (localHeartbeatFuture != null) {
221 localHeartbeatFuture.cancel(true);
222 heartbeatFuture = null;
224 channelStateByChannelUID.values().forEach(c -> c.getCache().resetState());
229 public void dispose() {
231 channelStateByChannelUID.clear();
235 * Called regularly. Tries to set receivedData to false. If it was already false and thing is ONLINE,
236 * update thing as OFFLINE with COMMUNICATION_ERROR.
238 private void heartbeat() {
239 synchronized (receivedData) {
240 if (!receivedData.getAndSet(false) && getThing().getStatus() == ThingStatus.ONLINE) {
241 getThing().getChannels().stream().map(Channel::getUID).filter(this::isLinked)
242 .forEach(c -> updateChannelState(c, UnDefType.UNDEF));
243 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
244 "@text/offline.communication-error.timeout");
250 public void processMessage(String topic, byte[] payload) {
251 receivedData.set(true);
253 final GatewayPayload parsed;
255 parsed = GatewayPayloadParser.parse(payload);
256 } catch (JsonSyntaxException | IllegalArgumentException e) {
257 // Perhaps thing has been configured with wrong topic. Logging extra details with trace
258 // Thing status change will be visible in logs with higher log level
259 logger.trace("Received invalid data which could not be parsed to any known Ruuvi Tag data formats ({}): {}",
260 e.getMessage(), new String(payload, StandardCharsets.UTF_8));
261 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
262 "@text/offline.communication-error.parse-error [\"" + e.getMessage() + "\"]");
265 var ruuvitagData = parsed.measurement;
267 boolean atLeastOneRuuviFieldPresent = false;
268 for (Channel channel : thing.getChannels()) {
269 ChannelUID channelUID = channel.getUID();
270 switch (channelUID.getId()) {
271 case CHANNEL_ID_ACCELERATIONX:
272 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getAccelerationX());
274 case CHANNEL_ID_ACCELERATIONY:
275 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getAccelerationY());
277 case CHANNEL_ID_ACCELERATIONZ:
278 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getAccelerationZ());
280 case CHANNEL_ID_BATTERY:
281 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getBatteryVoltage());
283 case CHANNEL_ID_DATA_FORMAT:
284 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getDataFormat());
286 case CHANNEL_ID_HUMIDITY:
287 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getHumidity());
289 case CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER:
290 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID,
291 ruuvitagData.getMeasurementSequenceNumber());
293 case CHANNEL_ID_MOVEMENT_COUNTER:
294 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getMovementCounter());
296 case CHANNEL_ID_PRESSURE:
297 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getPressure());
299 case CHANNEL_ID_TEMPERATURE:
300 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getTemperature());
302 case CHANNEL_ID_TX_POWER:
303 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getTxPower());
306 // Auxiliary channels, not part of bluetooth advertisement
308 case CHANNEL_ID_RSSI:
309 atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, parsed.rssi);
312 atLeastOneRuuviFieldPresent |= updateDateTimeStateIfLinked(channelUID, parsed.ts);
314 case CHANNEL_ID_GWTS:
315 atLeastOneRuuviFieldPresent |= updateDateTimeStateIfLinked(channelUID, parsed.gwts);
317 case CHANNEL_ID_GWMAC:
318 atLeastOneRuuviFieldPresent |= updateStringStateIfLinked(channelUID, parsed.gwMac);
321 logger.warn("BUG: We have unhandled channel: {}",
322 thing.getChannels().stream().map(Channel::getUID).collect(Collectors.toList()));
325 if (atLeastOneRuuviFieldPresent) {
326 String thingStatusDescription = getThing().getStatusInfo().getDescription();
327 if (getThing().getStatus() != ThingStatus.ONLINE
328 || (thingStatusDescription != null && !thingStatusDescription.isBlank())) {
329 // Update thing as ONLINE and possibly clear the thing detail status
330 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
333 if (logger.isTraceEnabled()) {
334 logger.trace("Received Ruuvi Tag data but no fields could be parsed: {}", HexUtils.bytesToHex(payload));
336 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
337 "@text/offline.communication-error.parse-error-no-fields");
342 public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
343 return channelStateByChannelUID.get(channelUID);
347 protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {
352 * Update number channel state
354 * Update is not done when value is null.
356 * @param channelUID channel UID
357 * @param value value to update
358 * @return whether the value was present
360 private boolean updateStateIfLinked(ChannelUID channelUID, @Nullable Number value) {
361 RuuviCachedNumberState<?> cache = (RuuviCachedNumberState<?>) channelStateByChannelUID.get(channelUID);
363 // Invariant as channels should be initialized already
364 logger.warn("Channel {} not initialized. BUG", channelUID);
371 if (isLinked(channelUID)) {
372 updateChannelState(channelUID, cache.getCache().getChannelState());
379 * Update string channel state
381 * Update is not done when value is null.
383 * @param channelUID channel UID
384 * @param value value to update
385 * @return whether the value was present
387 private <T extends Quantity<T>> boolean updateStringStateIfLinked(ChannelUID channelUID, Optional<String> value) {
388 RuuviCachedStringState cache = (RuuviCachedStringState) channelStateByChannelUID.get(channelUID);
390 // Invariant as channels should be initialized already
391 logger.error("Channel {} not initialized. BUG", channelUID);
394 if (value.isEmpty()) {
397 cache.update(value.get());
398 if (isLinked(channelUID)) {
399 updateChannelState(channelUID, cache.getCache().getChannelState());
406 * Update date time channel state
408 * Update is not done when value is null.
410 * @param channelUID channel UID
411 * @param value value to update
412 * @return whether the value was present
414 private boolean updateDateTimeStateIfLinked(ChannelUID channelUID, Optional<Instant> value) {
415 RuuviCachedDateTimeState cache = (RuuviCachedDateTimeState) channelStateByChannelUID.get(channelUID);
417 // Invariant as channels should be initialized already
418 logger.error("Channel {} not initialized. BUG", channelUID);
421 if (value.isEmpty()) {
424 cache.update(value.get());
425 if (isLinked(channelUID)) {
426 updateChannelState(channelUID, cache.getCache().getChannelState());