@NonNullByDefault
public class SDS011Handler extends BaseThingHandler {
private static final Duration CONNECTION_MONITOR_START_DELAY_OFFSET = Duration.ofSeconds(10);
+ private static final Duration RETRY_INIT_DELAY = Duration.ofSeconds(10);
private final Logger logger = LoggerFactory.getLogger(SDS011Handler.class);
private final SerialPortManager serialPortManager;
private NovaFineDustConfiguration config = new NovaFineDustConfiguration();
private @Nullable SDS011Communicator communicator;
- private @Nullable ScheduledFuture<?> pollingJob;
+ private @Nullable ScheduledFuture<?> dataReadJob;
private @Nullable ScheduledFuture<?> connectionMonitor;
+ private @Nullable ScheduledFuture<?> retryInitJob;
private ZonedDateTime lastCommunication = ZonedDateTime.now();
return;
}
- // parse ports and if the port is found, initialize the reader
+ // parse port and if the port is found, initialize the reader
SerialPortIdentifier portId = serialPortManager.getIdentifier(config.port);
if (portId == null) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.CONFIGURATION_ERROR, "Port is not known!");
+ logger.debug("Serial port {} was not found, retrying in {}.", config.port, RETRY_INIT_DELAY);
+ retryInitJob = scheduler.schedule(this::initialize, RETRY_INIT_DELAY.getSeconds(), TimeUnit.SECONDS);
return;
}
- this.communicator = new SDS011Communicator(this, portId);
+ this.communicator = new SDS011Communicator(this, portId, scheduler);
if (config.reporting) {
timeBetweenDataShouldArrive = Duration.ofMinutes(config.reportingInterval);
timeBetweenDataShouldArrive = Duration.ofSeconds(config.pollingInterval);
scheduler.submit(() -> initializeCommunicator(WorkMode.POLLING, timeBetweenDataShouldArrive));
}
-
- Duration connectionMonitorStartDelay = timeBetweenDataShouldArrive.plus(CONNECTION_MONITOR_START_DELAY_OFFSET);
- connectionMonitor = scheduler.scheduleWithFixedDelay(this::verifyIfStillConnected,
- connectionMonitorStartDelay.getSeconds(), timeBetweenDataShouldArrive.getSeconds(), TimeUnit.SECONDS);
}
private void initializeCommunicator(WorkMode mode, Duration interval) {
SDS011Communicator localCommunicator = communicator;
if (localCommunicator == null) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
- "Could not create communicator instance");
+ "Communicator instance is null in initializeCommunicator()");
return;
}
boolean initSuccessful = false;
- try {
- initSuccessful = localCommunicator.initialize(mode, interval);
- } catch (final IOException ex) {
- updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "I/O error!");
- return;
- } catch (PortInUseException e) {
- updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "Port is in use!");
- return;
- } catch (TooManyListenersException e) {
- updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
- "Cannot attach listener to port!");
- return;
- } catch (UnsupportedCommOperationException e) {
- updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
- "Cannot set serial port parameters");
- return;
+ int retryInit = 3;
+ int retryCount = 0;
+ // sometimes the device is a little difficult and needs multiple configuration attempts
+ while (!initSuccessful && retryCount < retryInit) {
+ logger.trace("Trying to initialize device attempt={}", retryCount);
+ initSuccessful = doInit(localCommunicator, mode, interval);
+ retryCount++;
}
if (initSuccessful) {
updateStatus(ThingStatus.ONLINE);
if (mode == WorkMode.POLLING) {
- pollingJob = scheduler.scheduleWithFixedDelay(() -> {
+ dataReadJob = scheduler.scheduleWithFixedDelay(() -> {
try {
localCommunicator.requestSensorData();
} catch (IOException e) {
"Cannot query data from device");
}
}, 2, config.pollingInterval, TimeUnit.SECONDS);
+ } else {
+ // start a job that reads the port until data arrives
+ int reportingReadStartDelay = 10;
+ int startReadBeforeDataArrives = 5;
+ long readReportedDataInterval = (config.reportingInterval * 60) - reportingReadStartDelay
+ - startReadBeforeDataArrives;
+ logger.trace("Scheduling job to receive reported values");
+ dataReadJob = scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ localCommunicator.readSensorData();
+ } catch (IOException e) {
+ updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
+ "Cannot query data from device, because: " + e.getMessage());
+ }
+ }, reportingReadStartDelay, readReportedDataInterval, TimeUnit.SECONDS);
}
+
+ Duration connectionMonitorStartDelay = timeBetweenDataShouldArrive
+ .plus(CONNECTION_MONITOR_START_DELAY_OFFSET);
+ connectionMonitor = scheduler.scheduleWithFixedDelay(this::verifyIfStillConnected,
+ connectionMonitorStartDelay.getSeconds(), timeBetweenDataShouldArrive.getSeconds(),
+ TimeUnit.SECONDS);
} else {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
"Commands and replies from the device don't seem to match");
- logger.debug("Could not configure sensor -> setting Thing to OFFLINE and disposing the handler");
- dispose();
+ logger.debug(
+ "Could not configure sensor -> setting Thing to OFFLINE, disposing the handler and reschedule initialize in {} seconds",
+ RETRY_INIT_DELAY);
+ doDispose(false);
+ retryInitJob = scheduler.schedule(this::initialize, RETRY_INIT_DELAY.getSeconds(), TimeUnit.SECONDS);
+ }
+ }
+
+ private boolean doInit(SDS011Communicator localCommunicator, WorkMode mode, Duration interval) {
+ boolean initSuccessful = false;
+ try {
+ initSuccessful = localCommunicator.initialize(mode, interval);
+ } catch (final IOException ex) {
+ updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "I/O error!");
+ } catch (PortInUseException e) {
+ updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "Port is in use!");
+ } catch (TooManyListenersException e) {
+ updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
+ "Cannot attach listener to port, because there are too many listeners!");
+ } catch (UnsupportedCommOperationException e) {
+ updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
+ "Cannot set serial port parameters");
}
+ return initSuccessful;
}
private boolean validateConfiguration() {
@Override
public void dispose() {
- ScheduledFuture<?> localPollingJob = this.pollingJob;
+ doDispose(true);
+ }
+
+ private void doDispose(boolean sendDeviceToSleep) {
+ ScheduledFuture<?> localPollingJob = this.dataReadJob;
if (localPollingJob != null) {
localPollingJob.cancel(true);
- this.pollingJob = null;
+ this.dataReadJob = null;
}
ScheduledFuture<?> localConnectionMonitor = this.connectionMonitor;
this.connectionMonitor = null;
}
+ ScheduledFuture<?> localRetryOpenPortJob = this.retryInitJob;
+ if (localRetryOpenPortJob != null) {
+ localRetryOpenPortJob.cancel(true);
+ this.retryInitJob = null;
+ }
+
SDS011Communicator localCommunicator = this.communicator;
if (localCommunicator != null) {
- localCommunicator.dispose();
+ localCommunicator.dispose(sendDeviceToSleep);
}
this.statePM10 = UnDefType.UNDEF;
"Check connection cable and afterwards disable and enable this thing to make it work again");
// in case someone has pulled the plug, we dispose ourselves and the user has to deactivate/activate the
// thing once the cable is plugged in again
- dispose();
+ doDispose(false);
} else {
logger.trace("Check Alive timer: All OK: lastCommunication={}, interval={}, tollerance={}",
lastCommunication, timeBetweenDataShouldArrive, dataCanBeLateTolerance);
import java.time.Duration;
import java.util.Arrays;
import java.util.TooManyListenersException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.novafinedust.internal.sds011protocol.messages.WorkingPeriodReply;
import org.openhab.core.io.transport.serial.PortInUseException;
import org.openhab.core.io.transport.serial.SerialPort;
-import org.openhab.core.io.transport.serial.SerialPortEvent;
-import org.openhab.core.io.transport.serial.SerialPortEventListener;
import org.openhab.core.io.transport.serial.SerialPortIdentifier;
import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
import org.openhab.core.util.HexUtils;
*
*/
@NonNullByDefault
-public class SDS011Communicator implements SerialPortEventListener {
+public class SDS011Communicator {
+
+ private static final int MAX_SENDOR_REPORTINGS_UNTIL_EXPECTED_REPLY = 20;
private final Logger logger = LoggerFactory.getLogger(SDS011Communicator.class);
private @Nullable OutputStream outputStream;
private @Nullable InputStream inputStream;
+ private @Nullable ScheduledExecutorService scheduler;
- public SDS011Communicator(SDS011Handler thingHandler, SerialPortIdentifier portId) {
+ public SDS011Communicator(SDS011Handler thingHandler, SerialPortIdentifier portId,
+ ScheduledExecutorService scheduler) {
this.thingHandler = thingHandler;
this.portId = portId;
+ this.scheduler = scheduler;
}
/**
throws PortInUseException, TooManyListenersException, IOException, UnsupportedCommOperationException {
boolean initSuccessful = true;
+ logger.trace("Initializing with mode={}, interval={}", mode, interval);
+
SerialPort localSerialPort = portId.open(thingHandler.getThing().getUID().toString(), 2000);
+ logger.trace("Port opened, object is={}", localSerialPort);
localSerialPort.setSerialPortParams(9600, 8, 1, 0);
+ logger.trace("Serial parameters set on port");
outputStream = localSerialPort.getOutputStream();
inputStream = localSerialPort.getInputStream();
if (inputStream == null || outputStream == null) {
throw new IOException("Could not create input or outputstream for the port");
}
+ logger.trace("Input and Outputstream opened for the port");
// wake up the device
initSuccessful &= sendSleep(false);
+ logger.trace("Wake up call done, initSuccessful={}", initSuccessful);
initSuccessful &= getFirmware();
+ logger.trace("Firmware requested, initSuccessful={}", initSuccessful);
if (mode == WorkMode.POLLING) {
initSuccessful &= setMode(WorkMode.POLLING);
+ logger.trace("Polling mode set, initSuccessful={}", initSuccessful);
initSuccessful &= setWorkingPeriod((byte) 0);
+ logger.trace("Working period for polling set, initSuccessful={}", initSuccessful);
} else {
// reporting
initSuccessful &= setWorkingPeriod((byte) interval.toMinutes());
+ logger.trace("Working period for reporting set, initSuccessful={}", initSuccessful);
initSuccessful &= setMode(WorkMode.REPORTING);
+ logger.trace("Reporting mode set, initSuccessful={}", initSuccessful);
}
- // enable listeners only after we have configured the sensor above because for configuring we send and read data
- // sequentially
- localSerialPort.notifyOnDataAvailable(true);
- localSerialPort.addEventListener(this);
this.serialPort = localSerialPort;
return initSuccessful;
logger.debug("Will send command: {} ({})", HexUtils.bytesToHex(commandData), Arrays.toString(commandData));
}
- write(commandData);
+ try {
+ write(commandData);
+ } catch (IOException ioex) {
+ logger.debug("Got an exception while writing a command, will not try to fetch a reply for it.", ioex);
+ throw ioex;
+ }
try {
// Give the sensor some time to handle the command
}
SensorReply reply = readReply();
// in case there is still another reporting active, we want to discard the sensor data and read the reply to our
- // command again
- if (reply instanceof SensorMeasuredDataReply) {
- reply = readReply();
+ // command again, this might happen more often in case the sensor has buffered some data
+ for (int i = 0; i < MAX_SENDOR_REPORTINGS_UNTIL_EXPECTED_REPLY; i++) {
+ if (reply instanceof SensorMeasuredDataReply) {
+ reply = readReply();
+ } else {
+ break;
+ }
}
return reply;
}
}
/**
- * Request data from the device, they will be returned via the serialEvent callback
+ * Request data from the device
*
* @throws IOException
*/
logger.debug("Requesting sensor data, will send: {}", HexUtils.bytesToHex(data));
}
write(data);
+ try {
+ Thread.sleep(200); // give the device some time to handle the command
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted while waiting before reading a reply to our rquest data command.");
+ Thread.currentThread().interrupt();
+ }
+ readSensorData();
}
private @Nullable SensorReply readReply() throws IOException {
InputStream localInpuStream = inputStream;
int b = -1;
- if (localInpuStream != null && localInpuStream.available() > 0) {
+ if (localInpuStream != null) {
+ logger.trace("Reading for reply until first byte is found");
while ((b = localInpuStream.read()) != Constants.MESSAGE_START_AS_INT) {
logger.debug("Trying to find first reply byte now...");
}
return null;
}
- /**
- * Data from the device is arriving and will be parsed accordingly
- */
- @Override
- public void serialEvent(SerialPortEvent event) {
- if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
- // we get here if data has been received
- SensorReply reply = null;
- try {
- reply = readReply();
- logger.debug("Got data from sensor: {}", reply);
- } catch (IOException e) {
- logger.warn("Could not read available data from the serial port: {}", e.getMessage());
- }
- if (reply instanceof SensorMeasuredDataReply) {
- SensorMeasuredDataReply sensorData = (SensorMeasuredDataReply) reply;
- if (sensorData.isValidData()) {
- thingHandler.updateChannels(sensorData);
- }
+ public void readSensorData() throws IOException {
+ logger.trace("readSensorData() called");
+ SensorReply reply = readReply();
+ logger.trace("readSensorData(): Read reply={}", reply);
+ if (reply instanceof SensorMeasuredDataReply) {
+ SensorMeasuredDataReply sensorData = (SensorMeasuredDataReply) reply;
+ logger.trace("We received sensor data");
+ if (sensorData.isValidData()) {
+ logger.trace("Sensor data is valid => updating channels");
+ thingHandler.updateChannels(sensorData);
}
}
}
/**
* Shutdown the communication, i.e. send the device to sleep and close the serial port
*/
- public void dispose() {
+ public void dispose(boolean sendtoSleep) {
SerialPort localSerialPort = serialPort;
if (localSerialPort != null) {
- try {
- // send the device to sleep to preserve power and extend the lifetime of the sensor
- sendSleep(true);
- } catch (IOException e) {
- // ignore because we are shutting down anyway
- logger.debug("Exception while disposing communicator (will ignore it)", e);
- } finally {
- localSerialPort.removeEventListener();
- localSerialPort.close();
- serialPort = null;
+ if (sendtoSleep) {
+ sendDeviceToSleepOnDispose();
}
- }
- try {
- InputStream localInputStream = inputStream;
- if (localInputStream != null) {
- localInputStream.close();
- }
- } catch (IOException e) {
- logger.debug("Error while closing the input stream: {}", e.getMessage());
+ logger.debug("Closing the port now");
+ localSerialPort.close();
+
+ serialPort = null;
}
+ this.scheduler = null;
+ }
- try {
- OutputStream localOutputStream = outputStream;
- if (localOutputStream != null) {
- localOutputStream.close();
+ private void sendDeviceToSleepOnDispose() {
+ @Nullable
+ ScheduledExecutorService localScheduler = scheduler;
+ if (localScheduler != null) {
+ Future<?> sleepJob = null;
+ try {
+ sleepJob = localScheduler.submit(() -> {
+ try {
+ sendSleep(true);
+ } catch (IOException e) {
+ logger.debug("Exception while sending sleep on disposing the communicator (will ignore it)", e);
+ }
+ });
+ sleepJob.get(5, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ logger.warn("Could not send device to sleep, because command takes longer than 5 seconds.");
+ sleepJob.cancel(true);
+ } catch (ExecutionException e) {
+ logger.debug("Could not execute sleep command.", e);
+ } catch (InterruptedException e) {
+ logger.debug("Sending device to sleep was interrupted.");
+ Thread.currentThread().interrupt();
}
- } catch (IOException e) {
- logger.debug("Error while closing the output stream: {}", e.getMessage());
+ } else {
+ logger.debug("Scheduler was null, could not send device to sleep.");
}
}
}