]> git.basschouten.com Git - openhab-addons.git/commitdiff
[novafinedust] Optimizations on access to the serial port (#10005)
authorStefan Triller <t2000@users.noreply.github.com>
Thu, 4 Feb 2021 19:49:43 +0000 (20:49 +0100)
committerGitHub <noreply@github.com>
Thu, 4 Feb 2021 19:49:43 +0000 (11:49 -0800)
* [novafinedust] Test for optimizations on access to the serial port

- retry logic if port does not yet exist on startup
- do not write sleep command on shutdown if port has issues
- no not register data listener on port but wait for data instead to be
  compatible with RFC2217 serial over network implementation
- ignore all buffered data from device during initialization to get the
  device into a defined state

* Adress review comments

- moved most "normal" logging to TRACE level
- used lambda function

* Improve error messages as requested in the review

Signed-off-by: Stefan Triller <github@stefantriller.de>
bundles/org.openhab.binding.novafinedust/src/main/java/org/openhab/binding/novafinedust/internal/SDS011Handler.java
bundles/org.openhab.binding.novafinedust/src/main/java/org/openhab/binding/novafinedust/internal/sds011protocol/SDS011Communicator.java

index 656896215daf26e5c440ea9ddcbba24de9875471..87cec1ad18bc2a2a8e4c7eac1870f6234c7e06f5 100644 (file)
@@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
 @NonNullByDefault
 public class SDS011Handler extends BaseThingHandler {
     private static final Duration CONNECTION_MONITOR_START_DELAY_OFFSET = Duration.ofSeconds(10);
+    private static final Duration RETRY_INIT_DELAY = Duration.ofSeconds(10);
 
     private final Logger logger = LoggerFactory.getLogger(SDS011Handler.class);
     private final SerialPortManager serialPortManager;
@@ -59,8 +60,9 @@ public class SDS011Handler extends BaseThingHandler {
     private NovaFineDustConfiguration config = new NovaFineDustConfiguration();
     private @Nullable SDS011Communicator communicator;
 
-    private @Nullable ScheduledFuture<?> pollingJob;
+    private @Nullable ScheduledFuture<?> dataReadJob;
     private @Nullable ScheduledFuture<?> connectionMonitor;
+    private @Nullable ScheduledFuture<?> retryInitJob;
 
     private ZonedDateTime lastCommunication = ZonedDateTime.now();
 
@@ -100,14 +102,16 @@ public class SDS011Handler extends BaseThingHandler {
             return;
         }
 
-        // parse ports and if the port is found, initialize the reader
+        // parse port and if the port is found, initialize the reader
         SerialPortIdentifier portId = serialPortManager.getIdentifier(config.port);
         if (portId == null) {
             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.CONFIGURATION_ERROR, "Port is not known!");
+            logger.debug("Serial port {} was not found, retrying in {}.", config.port, RETRY_INIT_DELAY);
+            retryInitJob = scheduler.schedule(this::initialize, RETRY_INIT_DELAY.getSeconds(), TimeUnit.SECONDS);
             return;
         }
 
-        this.communicator = new SDS011Communicator(this, portId);
+        this.communicator = new SDS011Communicator(this, portId, scheduler);
 
         if (config.reporting) {
             timeBetweenDataShouldArrive = Duration.ofMinutes(config.reportingInterval);
@@ -116,37 +120,24 @@ public class SDS011Handler extends BaseThingHandler {
             timeBetweenDataShouldArrive = Duration.ofSeconds(config.pollingInterval);
             scheduler.submit(() -> initializeCommunicator(WorkMode.POLLING, timeBetweenDataShouldArrive));
         }
-
-        Duration connectionMonitorStartDelay = timeBetweenDataShouldArrive.plus(CONNECTION_MONITOR_START_DELAY_OFFSET);
-        connectionMonitor = scheduler.scheduleWithFixedDelay(this::verifyIfStillConnected,
-                connectionMonitorStartDelay.getSeconds(), timeBetweenDataShouldArrive.getSeconds(), TimeUnit.SECONDS);
     }
 
     private void initializeCommunicator(WorkMode mode, Duration interval) {
         SDS011Communicator localCommunicator = communicator;
         if (localCommunicator == null) {
             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
-                    "Could not create communicator instance");
+                    "Communicator instance is null in initializeCommunicator()");
             return;
         }
 
         boolean initSuccessful = false;
-        try {
-            initSuccessful = localCommunicator.initialize(mode, interval);
-        } catch (final IOException ex) {
-            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "I/O error!");
-            return;
-        } catch (PortInUseException e) {
-            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "Port is in use!");
-            return;
-        } catch (TooManyListenersException e) {
-            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
-                    "Cannot attach listener to port!");
-            return;
-        } catch (UnsupportedCommOperationException e) {
-            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
-                    "Cannot set serial port parameters");
-            return;
+        int retryInit = 3;
+        int retryCount = 0;
+        // sometimes the device is a little difficult and needs multiple configuration attempts
+        while (!initSuccessful && retryCount < retryInit) {
+            logger.trace("Trying to initialize device attempt={}", retryCount);
+            initSuccessful = doInit(localCommunicator, mode, interval);
+            retryCount++;
         }
 
         if (initSuccessful) {
@@ -154,7 +145,7 @@ public class SDS011Handler extends BaseThingHandler {
             updateStatus(ThingStatus.ONLINE);
 
             if (mode == WorkMode.POLLING) {
-                pollingJob = scheduler.scheduleWithFixedDelay(() -> {
+                dataReadJob = scheduler.scheduleWithFixedDelay(() -> {
                     try {
                         localCommunicator.requestSensorData();
                     } catch (IOException e) {
@@ -162,13 +153,55 @@ public class SDS011Handler extends BaseThingHandler {
                                 "Cannot query data from device");
                     }
                 }, 2, config.pollingInterval, TimeUnit.SECONDS);
+            } else {
+                // start a job that reads the port until data arrives
+                int reportingReadStartDelay = 10;
+                int startReadBeforeDataArrives = 5;
+                long readReportedDataInterval = (config.reportingInterval * 60) - reportingReadStartDelay
+                        - startReadBeforeDataArrives;
+                logger.trace("Scheduling job to receive reported values");
+                dataReadJob = scheduler.scheduleWithFixedDelay(() -> {
+                    try {
+                        localCommunicator.readSensorData();
+                    } catch (IOException e) {
+                        updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
+                                "Cannot query data from device, because: " + e.getMessage());
+                    }
+                }, reportingReadStartDelay, readReportedDataInterval, TimeUnit.SECONDS);
             }
+
+            Duration connectionMonitorStartDelay = timeBetweenDataShouldArrive
+                    .plus(CONNECTION_MONITOR_START_DELAY_OFFSET);
+            connectionMonitor = scheduler.scheduleWithFixedDelay(this::verifyIfStillConnected,
+                    connectionMonitorStartDelay.getSeconds(), timeBetweenDataShouldArrive.getSeconds(),
+                    TimeUnit.SECONDS);
         } else {
             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
                     "Commands and replies from the device don't seem to match");
-            logger.debug("Could not configure sensor -> setting Thing to OFFLINE and disposing the handler");
-            dispose();
+            logger.debug(
+                    "Could not configure sensor -> setting Thing to OFFLINE, disposing the handler and reschedule initialize in {} seconds",
+                    RETRY_INIT_DELAY);
+            doDispose(false);
+            retryInitJob = scheduler.schedule(this::initialize, RETRY_INIT_DELAY.getSeconds(), TimeUnit.SECONDS);
+        }
+    }
+
+    private boolean doInit(SDS011Communicator localCommunicator, WorkMode mode, Duration interval) {
+        boolean initSuccessful = false;
+        try {
+            initSuccessful = localCommunicator.initialize(mode, interval);
+        } catch (final IOException ex) {
+            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "I/O error!");
+        } catch (PortInUseException e) {
+            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "Port is in use!");
+        } catch (TooManyListenersException e) {
+            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
+                    "Cannot attach listener to port, because there are too many listeners!");
+        } catch (UnsupportedCommOperationException e) {
+            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
+                    "Cannot set serial port parameters");
         }
+        return initSuccessful;
     }
 
     private boolean validateConfiguration() {
@@ -194,10 +227,14 @@ public class SDS011Handler extends BaseThingHandler {
 
     @Override
     public void dispose() {
-        ScheduledFuture<?> localPollingJob = this.pollingJob;
+        doDispose(true);
+    }
+
+    private void doDispose(boolean sendDeviceToSleep) {
+        ScheduledFuture<?> localPollingJob = this.dataReadJob;
         if (localPollingJob != null) {
             localPollingJob.cancel(true);
-            this.pollingJob = null;
+            this.dataReadJob = null;
         }
 
         ScheduledFuture<?> localConnectionMonitor = this.connectionMonitor;
@@ -206,9 +243,15 @@ public class SDS011Handler extends BaseThingHandler {
             this.connectionMonitor = null;
         }
 
+        ScheduledFuture<?> localRetryOpenPortJob = this.retryInitJob;
+        if (localRetryOpenPortJob != null) {
+            localRetryOpenPortJob.cancel(true);
+            this.retryInitJob = null;
+        }
+
         SDS011Communicator localCommunicator = this.communicator;
         if (localCommunicator != null) {
-            localCommunicator.dispose();
+            localCommunicator.dispose(sendDeviceToSleep);
         }
 
         this.statePM10 = UnDefType.UNDEF;
@@ -248,7 +291,7 @@ public class SDS011Handler extends BaseThingHandler {
                     "Check connection cable and afterwards disable and enable this thing to make it work again");
             // in case someone has pulled the plug, we dispose ourselves and the user has to deactivate/activate the
             // thing once the cable is plugged in again
-            dispose();
+            doDispose(false);
         } else {
             logger.trace("Check Alive timer: All OK: lastCommunication={}, interval={}, tollerance={}",
                     lastCommunication, timeBetweenDataShouldArrive, dataCanBeLateTolerance);
index db438376acf0e68cf9e700ada3a3e7817654f97d..3e3c11e32980d2f8f2176f09c93350b3cb69e356 100644 (file)
@@ -18,6 +18,11 @@ import java.io.OutputStream;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.TooManyListenersException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
@@ -32,8 +37,6 @@ import org.openhab.binding.novafinedust.internal.sds011protocol.messages.SleepRe
 import org.openhab.binding.novafinedust.internal.sds011protocol.messages.WorkingPeriodReply;
 import org.openhab.core.io.transport.serial.PortInUseException;
 import org.openhab.core.io.transport.serial.SerialPort;
-import org.openhab.core.io.transport.serial.SerialPortEvent;
-import org.openhab.core.io.transport.serial.SerialPortEventListener;
 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
 import org.openhab.core.util.HexUtils;
@@ -47,7 +50,9 @@ import org.slf4j.LoggerFactory;
  *
  */
 @NonNullByDefault
-public class SDS011Communicator implements SerialPortEventListener {
+public class SDS011Communicator {
+
+    private static final int MAX_SENDOR_REPORTINGS_UNTIL_EXPECTED_REPLY = 20;
 
     private final Logger logger = LoggerFactory.getLogger(SDS011Communicator.class);
 
@@ -57,10 +62,13 @@ public class SDS011Communicator implements SerialPortEventListener {
 
     private @Nullable OutputStream outputStream;
     private @Nullable InputStream inputStream;
+    private @Nullable ScheduledExecutorService scheduler;
 
-    public SDS011Communicator(SDS011Handler thingHandler, SerialPortIdentifier portId) {
+    public SDS011Communicator(SDS011Handler thingHandler, SerialPortIdentifier portId,
+            ScheduledExecutorService scheduler) {
         this.thingHandler = thingHandler;
         this.portId = portId;
+        this.scheduler = scheduler;
     }
 
     /**
@@ -78,8 +86,12 @@ public class SDS011Communicator implements SerialPortEventListener {
             throws PortInUseException, TooManyListenersException, IOException, UnsupportedCommOperationException {
         boolean initSuccessful = true;
 
+        logger.trace("Initializing with mode={}, interval={}", mode, interval);
+
         SerialPort localSerialPort = portId.open(thingHandler.getThing().getUID().toString(), 2000);
+        logger.trace("Port opened, object is={}", localSerialPort);
         localSerialPort.setSerialPortParams(9600, 8, 1, 0);
+        logger.trace("Serial parameters set on port");
 
         outputStream = localSerialPort.getOutputStream();
         inputStream = localSerialPort.getInputStream();
@@ -87,24 +99,27 @@ public class SDS011Communicator implements SerialPortEventListener {
         if (inputStream == null || outputStream == null) {
             throw new IOException("Could not create input or outputstream for the port");
         }
+        logger.trace("Input and Outputstream opened for the port");
 
         // wake up the device
         initSuccessful &= sendSleep(false);
+        logger.trace("Wake up call done, initSuccessful={}", initSuccessful);
         initSuccessful &= getFirmware();
+        logger.trace("Firmware requested, initSuccessful={}", initSuccessful);
 
         if (mode == WorkMode.POLLING) {
             initSuccessful &= setMode(WorkMode.POLLING);
+            logger.trace("Polling mode set, initSuccessful={}", initSuccessful);
             initSuccessful &= setWorkingPeriod((byte) 0);
+            logger.trace("Working period for polling set, initSuccessful={}", initSuccessful);
         } else {
             // reporting
             initSuccessful &= setWorkingPeriod((byte) interval.toMinutes());
+            logger.trace("Working period for reporting set, initSuccessful={}", initSuccessful);
             initSuccessful &= setMode(WorkMode.REPORTING);
+            logger.trace("Reporting mode set, initSuccessful={}", initSuccessful);
         }
 
-        // enable listeners only after we have configured the sensor above because for configuring we send and read data
-        // sequentially
-        localSerialPort.notifyOnDataAvailable(true);
-        localSerialPort.addEventListener(this);
         this.serialPort = localSerialPort;
 
         return initSuccessful;
@@ -116,7 +131,12 @@ public class SDS011Communicator implements SerialPortEventListener {
             logger.debug("Will send command: {} ({})", HexUtils.bytesToHex(commandData), Arrays.toString(commandData));
         }
 
-        write(commandData);
+        try {
+            write(commandData);
+        } catch (IOException ioex) {
+            logger.debug("Got an exception while writing a command, will not try to fetch a reply for it.", ioex);
+            throw ioex;
+        }
 
         try {
             // Give the sensor some time to handle the command
@@ -127,9 +147,13 @@ public class SDS011Communicator implements SerialPortEventListener {
         }
         SensorReply reply = readReply();
         // in case there is still another reporting active, we want to discard the sensor data and read the reply to our
-        // command again
-        if (reply instanceof SensorMeasuredDataReply) {
-            reply = readReply();
+        // command again, this might happen more often in case the sensor has buffered some data
+        for (int i = 0; i < MAX_SENDOR_REPORTINGS_UNTIL_EXPECTED_REPLY; i++) {
+            if (reply instanceof SensorMeasuredDataReply) {
+                reply = readReply();
+            } else {
+                break;
+            }
         }
         return reply;
     }
@@ -218,7 +242,7 @@ public class SDS011Communicator implements SerialPortEventListener {
     }
 
     /**
-     * Request data from the device, they will be returned via the serialEvent callback
+     * Request data from the device
      *
      * @throws IOException
      */
@@ -229,6 +253,13 @@ public class SDS011Communicator implements SerialPortEventListener {
             logger.debug("Requesting sensor data, will send: {}", HexUtils.bytesToHex(data));
         }
         write(data);
+        try {
+            Thread.sleep(200); // give the device some time to handle the command
+        } catch (InterruptedException e) {
+            logger.warn("Interrupted while waiting before reading a reply to our rquest data command.");
+            Thread.currentThread().interrupt();
+        }
+        readSensorData();
     }
 
     private @Nullable SensorReply readReply() throws IOException {
@@ -237,7 +268,8 @@ public class SDS011Communicator implements SerialPortEventListener {
         InputStream localInpuStream = inputStream;
 
         int b = -1;
-        if (localInpuStream != null && localInpuStream.available() > 0) {
+        if (localInpuStream != null) {
+            logger.trace("Reading for reply until first byte is found");
             while ((b = localInpuStream.read()) != Constants.MESSAGE_START_AS_INT) {
                 logger.debug("Trying to find first reply byte now...");
             }
@@ -252,25 +284,16 @@ public class SDS011Communicator implements SerialPortEventListener {
         return null;
     }
 
-    /**
-     * Data from the device is arriving and will be parsed accordingly
-     */
-    @Override
-    public void serialEvent(SerialPortEvent event) {
-        if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
-            // we get here if data has been received
-            SensorReply reply = null;
-            try {
-                reply = readReply();
-                logger.debug("Got data from sensor: {}", reply);
-            } catch (IOException e) {
-                logger.warn("Could not read available data from the serial port: {}", e.getMessage());
-            }
-            if (reply instanceof SensorMeasuredDataReply) {
-                SensorMeasuredDataReply sensorData = (SensorMeasuredDataReply) reply;
-                if (sensorData.isValidData()) {
-                    thingHandler.updateChannels(sensorData);
-                }
+    public void readSensorData() throws IOException {
+        logger.trace("readSensorData() called");
+        SensorReply reply = readReply();
+        logger.trace("readSensorData(): Read reply={}", reply);
+        if (reply instanceof SensorMeasuredDataReply) {
+            SensorMeasuredDataReply sensorData = (SensorMeasuredDataReply) reply;
+            logger.trace("We received sensor data");
+            if (sensorData.isValidData()) {
+                logger.trace("Sensor data is valid => updating channels");
+                thingHandler.updateChannels(sensorData);
             }
         }
     }
@@ -278,38 +301,46 @@ public class SDS011Communicator implements SerialPortEventListener {
     /**
      * Shutdown the communication, i.e. send the device to sleep and close the serial port
      */
-    public void dispose() {
+    public void dispose(boolean sendtoSleep) {
         SerialPort localSerialPort = serialPort;
         if (localSerialPort != null) {
-            try {
-                // send the device to sleep to preserve power and extend the lifetime of the sensor
-                sendSleep(true);
-            } catch (IOException e) {
-                // ignore because we are shutting down anyway
-                logger.debug("Exception while disposing communicator (will ignore it)", e);
-            } finally {
-                localSerialPort.removeEventListener();
-                localSerialPort.close();
-                serialPort = null;
+            if (sendtoSleep) {
+                sendDeviceToSleepOnDispose();
             }
-        }
 
-        try {
-            InputStream localInputStream = inputStream;
-            if (localInputStream != null) {
-                localInputStream.close();
-            }
-        } catch (IOException e) {
-            logger.debug("Error while closing the input stream: {}", e.getMessage());
+            logger.debug("Closing the port now");
+            localSerialPort.close();
+
+            serialPort = null;
         }
+        this.scheduler = null;
+    }
 
-        try {
-            OutputStream localOutputStream = outputStream;
-            if (localOutputStream != null) {
-                localOutputStream.close();
+    private void sendDeviceToSleepOnDispose() {
+        @Nullable
+        ScheduledExecutorService localScheduler = scheduler;
+        if (localScheduler != null) {
+            Future<?> sleepJob = null;
+            try {
+                sleepJob = localScheduler.submit(() -> {
+                    try {
+                        sendSleep(true);
+                    } catch (IOException e) {
+                        logger.debug("Exception while sending sleep on disposing the communicator (will ignore it)", e);
+                    }
+                });
+                sleepJob.get(5, TimeUnit.SECONDS);
+            } catch (TimeoutException e) {
+                logger.warn("Could not send device to sleep, because command takes longer than 5 seconds.");
+                sleepJob.cancel(true);
+            } catch (ExecutionException e) {
+                logger.debug("Could not execute sleep command.", e);
+            } catch (InterruptedException e) {
+                logger.debug("Sending device to sleep was interrupted.");
+                Thread.currentThread().interrupt();
             }
-        } catch (IOException e) {
-            logger.debug("Error while closing the output stream: {}", e.getMessage());
+        } else {
+            logger.debug("Scheduler was null, could not send device to sleep.");
         }
     }
 }