]> git.basschouten.com Git - openhab-addons.git/blob
a925738e325377a6b1273252532fc5196f68593e
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.ruuvigateway.internal.handler;
14
15 import static org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants.*;
16
17 import java.lang.reflect.InvocationTargetException;
18 import java.nio.charset.StandardCharsets;
19 import java.time.Instant;
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.stream.Collectors;
29
30 import javax.measure.Quantity;
31 import javax.measure.Unit;
32
33 import org.eclipse.jdt.annotation.NonNullByDefault;
34 import org.eclipse.jdt.annotation.Nullable;
35 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
36 import org.openhab.binding.mqtt.generic.ChannelState;
37 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviCachedDateTimeState;
38 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviCachedNumberState;
39 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviCachedStringState;
40 import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants;
41 import org.openhab.binding.mqtt.ruuvigateway.internal.parser.GatewayPayloadParser;
42 import org.openhab.binding.mqtt.ruuvigateway.internal.parser.GatewayPayloadParser.GatewayPayload;
43 import org.openhab.core.config.core.Configuration;
44 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
45 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
46 import org.openhab.core.library.unit.SIUnits;
47 import org.openhab.core.library.unit.Units;
48 import org.openhab.core.thing.Channel;
49 import org.openhab.core.thing.ChannelUID;
50 import org.openhab.core.thing.Thing;
51 import org.openhab.core.thing.ThingStatus;
52 import org.openhab.core.thing.ThingStatusDetail;
53 import org.openhab.core.types.UnDefType;
54 import org.openhab.core.util.HexUtils;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import com.google.gson.JsonSyntaxException;
59
60 /**
61  * The {@link RuuviTagHandler} is responsible updating RuuviTag Sensor data received from
62  * Ruuvi Gateway via MQTT.
63  *
64  * @author Sami Salonen - Initial contribution
65  */
66 @NonNullByDefault
67 public class RuuviTagHandler extends AbstractMQTTThingHandler implements MqttMessageSubscriber {
68
69     // Ruuvitag sends an update every 10 seconds. So we keep a heartbeat to give it some slack
70     private int heartbeatTimeoutMillisecs = 60_000;
71     // This map is used to initialize channel caches.
72     // Key is channel ID.
73     // Value is one of the following
74     // - null (plain number), uses RuuviCachedNumberState
75     // - Unit (QuantityType Number), uses RuuviCachedNumberState with unit
76     // - Class object, uses given class object with String constructor
77
78     private static final Map<String, @Nullable Object> unitByChannelUID = new HashMap<>(11);
79     static {
80         unitByChannelUID.put(CHANNEL_ID_ACCELERATIONX, Units.STANDARD_GRAVITY);
81         unitByChannelUID.put(CHANNEL_ID_ACCELERATIONY, Units.STANDARD_GRAVITY);
82         unitByChannelUID.put(CHANNEL_ID_ACCELERATIONZ, Units.STANDARD_GRAVITY);
83         unitByChannelUID.put(CHANNEL_ID_BATTERY, Units.VOLT);
84         unitByChannelUID.put(CHANNEL_ID_DATA_FORMAT, null);
85         unitByChannelUID.put(CHANNEL_ID_HUMIDITY, Units.PERCENT);
86         unitByChannelUID.put(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER, Units.ONE);
87         unitByChannelUID.put(CHANNEL_ID_MOVEMENT_COUNTER, Units.ONE);
88         unitByChannelUID.put(CHANNEL_ID_PRESSURE, SIUnits.PASCAL);
89         unitByChannelUID.put(CHANNEL_ID_TEMPERATURE, SIUnits.CELSIUS);
90         unitByChannelUID.put(CHANNEL_ID_TX_POWER, Units.DECIBEL_MILLIWATTS);
91         unitByChannelUID.put(CHANNEL_ID_RSSI, Units.DECIBEL_MILLIWATTS);
92         unitByChannelUID.put(CHANNEL_ID_TS, RuuviCachedDateTimeState.class);
93         unitByChannelUID.put(CHANNEL_ID_GWTS, RuuviCachedDateTimeState.class);
94         unitByChannelUID.put(CHANNEL_ID_GWMAC, RuuviCachedStringState.class);
95     }
96
97     private final Logger logger = LoggerFactory.getLogger(RuuviTagHandler.class);
98     /**
99      * Indicator whether we have received data recently
100      */
101     private final AtomicBoolean receivedData = new AtomicBoolean();
102     private final Map<ChannelUID, ChannelState> channelStateByChannelUID = new HashMap<>();
103     private @NonNullByDefault({}) ScheduledFuture<?> heartbeatFuture;
104
105     /**
106      * Topic with data for this particular Ruuvi Tag. Set in initialize (when configuration is valid).
107      */
108     private @NonNullByDefault({}) String topic;
109
110     public RuuviTagHandler(Thing thing, int subscribeTimeout) {
111         super(thing, subscribeTimeout);
112     }
113
114     @Override
115     public void initialize() {
116         initializeChannelCaches();
117         Configuration configuration = getThing().getConfiguration();
118         String topic = (String) configuration.get(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TOPIC);
119         if (topic == null || topic.isBlank()) {
120             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
121                     "@text/offline.configuration-error.missing-topic");
122             return;
123         }
124         Object timeout = configuration.get(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TIMEOUT);
125         if (timeout != null) {
126             // Note: only in tests
127             heartbeatTimeoutMillisecs = Integer.parseInt(timeout.toString());
128             logger.warn("Using overridden timeout: {}", heartbeatTimeoutMillisecs);
129         }
130
131         this.topic = topic;
132         super.initialize();
133     }
134
135     private void initializeChannelCaches() {
136         for (Channel channel : thing.getChannels()) {
137             ChannelUID channelUID = channel.getUID();
138             String channelID = channelUID.getId();
139             assert unitByChannelUID.containsKey(channelID); // Invariant as all channels should exist in the static map
140             Object cacheHint = unitByChannelUID.get(channelID);
141             if (cacheHint == null || cacheHint instanceof Unit<?>) {
142                 Unit<?> unit = (Unit<?>) cacheHint;
143                 initNumberStateCache(channelUID, unit);
144             } else {
145                 Class<?> cacheType = (Class<?>) cacheHint;
146                 initCacheWithClass(channelUID, cacheType);
147             }
148
149         }
150     }
151
152     private <T extends Quantity<T>> RuuviCachedNumberState<?> initNumberStateCache(ChannelUID channelUID,
153             @Nullable Unit<T> unit) {
154         final RuuviCachedNumberState<?> cached;
155         if (unit == null) {
156             cached = new RuuviCachedNumberState<>(channelUID);
157             channelStateByChannelUID.put(channelUID, cached);
158         } else {
159             cached = new RuuviCachedNumberState<>(channelUID, unit);
160             channelStateByChannelUID.put(channelUID, cached);
161         }
162         return cached;
163     }
164
165     private ChannelState initCacheWithClass(ChannelUID channelUID, Class<?> clazz) {
166         try {
167             ChannelState cached = (ChannelState) clazz.getConstructor(ChannelUID.class).newInstance(channelUID);
168             Objects.requireNonNull(cached); // to make compiler happy
169             channelStateByChannelUID.put(channelUID, cached);
170             return cached;
171         } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
172                 | NoSuchMethodException | SecurityException e) {
173             throw new IllegalStateException(e);
174         }
175     }
176
177     @Override
178     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
179         if (topic == null) {
180             // Initialization has not been completed successfully, return early without changing
181             // thing status
182             return CompletableFuture.completedFuture(null);
183         }
184
185         updateStatus(ThingStatus.UNKNOWN);
186         return connection.subscribe(topic, this).handle((subscriptionSuccess, subscriptionException) -> {
187             if (subscriptionSuccess) {
188                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE, "@text/online.waiting-initial-data");
189                 heartbeatFuture = scheduler.scheduleWithFixedDelay(this::heartbeat, heartbeatTimeoutMillisecs,
190                         heartbeatTimeoutMillisecs, TimeUnit.MILLISECONDS);
191             } else {
192                 if (subscriptionException == null) {
193                     updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
194                             "@text/offline.communication-error.mqtt-subscription-failed");
195                 } else {
196                     updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
197                             "@text/offline.communication-error.mqtt-subscription-failed-details [\""
198                                     + subscriptionException.getClass().getSimpleName() + "\", \""
199                                     + subscriptionException.getMessage() + "\"]");
200                 }
201             }
202             return null;
203         });
204     }
205
206     @Override
207     public CompletableFuture<Void> unsubscribeAll() {
208         MqttBrokerConnection localConnection = connection;
209         String localTopic = topic;
210         if (localConnection != null && localTopic != null) {
211             return localConnection.unsubscribe(localTopic, this).thenCompose(unsubscribeSuccessful -> null);
212         } else {
213             return CompletableFuture.completedFuture(null);
214         }
215     }
216
217     @Override
218     protected void stop() {
219         ScheduledFuture<?> localHeartbeatFuture = heartbeatFuture;
220         if (localHeartbeatFuture != null) {
221             localHeartbeatFuture.cancel(true);
222             heartbeatFuture = null;
223         }
224         channelStateByChannelUID.values().forEach(c -> c.getCache().resetState());
225         super.stop();
226     }
227
228     @Override
229     public void dispose() {
230         super.dispose();
231         channelStateByChannelUID.clear();
232     }
233
234     /**
235      * Called regularly. Tries to set receivedData to false. If it was already false and thing is ONLINE,
236      * update thing as OFFLINE with COMMUNICATION_ERROR.
237      */
238     private void heartbeat() {
239         synchronized (receivedData) {
240             if (!receivedData.getAndSet(false) && getThing().getStatus() == ThingStatus.ONLINE) {
241                 getThing().getChannels().stream().map(Channel::getUID).filter(this::isLinked)
242                         .forEach(c -> updateChannelState(c, UnDefType.UNDEF));
243                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
244                         "@text/offline.communication-error.timeout");
245             }
246         }
247     }
248
249     @Override
250     public void processMessage(String topic, byte[] payload) {
251         receivedData.set(true);
252
253         final GatewayPayload parsed;
254         try {
255             parsed = GatewayPayloadParser.parse(payload);
256         } catch (JsonSyntaxException | IllegalArgumentException e) {
257             // Perhaps thing has been configured with wrong topic. Logging extra details with trace
258             // Thing status change will be visible in logs with higher log level
259             logger.trace("Received invalid data which could not be parsed to any known Ruuvi Tag data formats ({}): {}",
260                     e.getMessage(), new String(payload, StandardCharsets.UTF_8));
261             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
262                     "@text/offline.communication-error.parse-error [\"" + e.getMessage() + "\"]");
263             return;
264         }
265         var ruuvitagData = parsed.measurement;
266
267         boolean atLeastOneRuuviFieldPresent = false;
268         for (Channel channel : thing.getChannels()) {
269             ChannelUID channelUID = channel.getUID();
270             switch (channelUID.getId()) {
271                 case CHANNEL_ID_ACCELERATIONX:
272                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getAccelerationX());
273                     break;
274                 case CHANNEL_ID_ACCELERATIONY:
275                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getAccelerationY());
276                     break;
277                 case CHANNEL_ID_ACCELERATIONZ:
278                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getAccelerationZ());
279                     break;
280                 case CHANNEL_ID_BATTERY:
281                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getBatteryVoltage());
282                     break;
283                 case CHANNEL_ID_DATA_FORMAT:
284                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getDataFormat());
285                     break;
286                 case CHANNEL_ID_HUMIDITY:
287                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getHumidity());
288                     break;
289                 case CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER:
290                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID,
291                             ruuvitagData.getMeasurementSequenceNumber());
292                     break;
293                 case CHANNEL_ID_MOVEMENT_COUNTER:
294                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getMovementCounter());
295                     break;
296                 case CHANNEL_ID_PRESSURE:
297                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getPressure());
298                     break;
299                 case CHANNEL_ID_TEMPERATURE:
300                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getTemperature());
301                     break;
302                 case CHANNEL_ID_TX_POWER:
303                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, ruuvitagData.getTxPower());
304                     break;
305                 //
306                 // Auxiliary channels, not part of bluetooth advertisement
307                 //
308                 case CHANNEL_ID_RSSI:
309                     atLeastOneRuuviFieldPresent |= updateStateIfLinked(channelUID, parsed.rssi);
310                     break;
311                 case CHANNEL_ID_TS:
312                     atLeastOneRuuviFieldPresent |= updateDateTimeStateIfLinked(channelUID, parsed.ts);
313                     break;
314                 case CHANNEL_ID_GWTS:
315                     atLeastOneRuuviFieldPresent |= updateDateTimeStateIfLinked(channelUID, parsed.gwts);
316                     break;
317                 case CHANNEL_ID_GWMAC:
318                     atLeastOneRuuviFieldPresent |= updateStringStateIfLinked(channelUID, parsed.gwMac);
319                     break;
320                 default:
321                     logger.warn("BUG: We have unhandled channel: {}",
322                             thing.getChannels().stream().map(Channel::getUID).collect(Collectors.toList()));
323             }
324         }
325         if (atLeastOneRuuviFieldPresent) {
326             String thingStatusDescription = getThing().getStatusInfo().getDescription();
327             if (getThing().getStatus() != ThingStatus.ONLINE
328                     || (thingStatusDescription != null && !thingStatusDescription.isBlank())) {
329                 // Update thing as ONLINE and possibly clear the thing detail status
330                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
331             }
332         } else {
333             if (logger.isTraceEnabled()) {
334                 logger.trace("Received Ruuvi Tag data but no fields could be parsed: {}", HexUtils.bytesToHex(payload));
335             }
336             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
337                     "@text/offline.communication-error.parse-error-no-fields");
338         }
339     }
340
341     @Override
342     public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
343         return channelStateByChannelUID.get(channelUID);
344     }
345
346     @Override
347     protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {
348         // Not used here
349     }
350
351     /**
352      * Update number channel state
353      *
354      * Update is not done when value is null.
355      *
356      * @param channelUID channel UID
357      * @param value value to update
358      * @return whether the value was present
359      */
360     private boolean updateStateIfLinked(ChannelUID channelUID, @Nullable Number value) {
361         RuuviCachedNumberState<?> cache = (RuuviCachedNumberState<?>) channelStateByChannelUID.get(channelUID);
362         if (cache == null) {
363             // Invariant as channels should be initialized already
364             logger.warn("Channel {} not initialized. BUG", channelUID);
365             return false;
366         }
367         if (value == null) {
368             return false;
369         } else {
370             cache.update(value);
371             if (isLinked(channelUID)) {
372                 updateChannelState(channelUID, cache.getCache().getChannelState());
373             }
374             return true;
375         }
376     }
377
378     /**
379      * Update string channel state
380      *
381      * Update is not done when value is null.
382      *
383      * @param channelUID channel UID
384      * @param value value to update
385      * @return whether the value was present
386      */
387     private <T extends Quantity<T>> boolean updateStringStateIfLinked(ChannelUID channelUID, Optional<String> value) {
388         RuuviCachedStringState cache = (RuuviCachedStringState) channelStateByChannelUID.get(channelUID);
389         if (cache == null) {
390             // Invariant as channels should be initialized already
391             logger.error("Channel {} not initialized. BUG", channelUID);
392             return false;
393         }
394         if (value.isEmpty()) {
395             return false;
396         } else {
397             cache.update(value.get());
398             if (isLinked(channelUID)) {
399                 updateChannelState(channelUID, cache.getCache().getChannelState());
400             }
401             return true;
402         }
403     }
404
405     /**
406      * Update date time channel state
407      *
408      * Update is not done when value is null.
409      *
410      * @param channelUID channel UID
411      * @param value value to update
412      * @return whether the value was present
413      */
414     private boolean updateDateTimeStateIfLinked(ChannelUID channelUID, Optional<Instant> value) {
415         RuuviCachedDateTimeState cache = (RuuviCachedDateTimeState) channelStateByChannelUID.get(channelUID);
416         if (cache == null) {
417             // Invariant as channels should be initialized already
418             logger.error("Channel {} not initialized. BUG", channelUID);
419             return false;
420         }
421         if (value.isEmpty()) {
422             return false;
423         } else {
424             cache.update(value.get());
425             if (isLinked(channelUID)) {
426                 updateChannelState(channelUID, cache.getCache().getChannelState());
427             }
428             return true;
429         }
430     }
431 }