2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.rfxcom.internal.connector;
15 import java.io.IOException;
16 import java.util.Arrays;
18 import org.openhab.binding.rfxcom.internal.exceptions.RFXComTimeoutException;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
23 * RFXCOM stream reader to parse RFXCOM output into messages.
25 * @author Pauli Anttila - Initial contribution
26 * @author James Hewitt-Thomas - New class
27 * @author Mike Jagdis - Interruptible read loop
28 * @author Martin van Wingerden - Slight refactoring for read loop for TCP connector
30 public class RFXComStreamReader extends Thread {
31 private final Logger logger = LoggerFactory.getLogger(RFXComStreamReader.class);
32 private static final int MAX_READ_TIMEOUTS = 4;
33 private static final int MAX_RFXCOM_MESSAGE_LEN = 256;
35 private RFXComBaseConnector connector;
37 private class ExceptionHandler implements Thread.UncaughtExceptionHandler {
39 public void uncaughtException(Thread thread, Throwable throwable) {
40 logger.debug("Connector died: ", throwable);
41 connector.sendErrorToListeners("Connector died: " + throwable.getMessage());
45 public RFXComStreamReader(RFXComBaseConnector connector, String threadName) {
47 this.connector = connector;
48 setUncaughtExceptionHandler(new ExceptionHandler());
53 logger.debug("Data listener started");
54 byte[] buf = new byte[MAX_RFXCOM_MESSAGE_LEN];
56 // The stream has (or SHOULD have) a read timeout set. Taking a
57 // read timeout (read returns 0) between packets gives us a chance
58 // to check if we've been interrupted. Read interrupts during a
59 // packet are ignored but if too many timeouts occur we take it as
60 // meaning the RFXCOM has become missing presumed dead.
62 while (!Thread.interrupted()) {
63 // First byte tells us how long the packet is
64 int bytesRead = connector.read(buf, 0, 1);
65 int packetLength = buf[0];
67 if (bytesRead > 0 && packetLength > 0) {
68 logger.trace("Message length is {} bytes", packetLength);
69 processMessage(buf, packetLength);
70 connector.sendMsgToListeners(Arrays.copyOfRange(buf, 0, packetLength + 1));
71 } else if (bytesRead == -1) {
72 throw new IOException("End of stream");
75 } catch (IOException | RFXComTimeoutException e) {
76 logger.debug("Received exception, will report it to listeners", e);
77 connector.sendErrorToListeners(e.getMessage());
80 logger.debug("Data listener stopped");
83 private void processMessage(byte[] buf, int packetLength) throws IOException, RFXComTimeoutException {
84 // Now read the rest of the packet
86 int readTimeoutCount = 1;
87 while (bufferIndex <= packetLength) {
88 int bytesRemaining = packetLength - bufferIndex + 1;
89 logger.trace("Waiting remaining {} bytes from the message", bytesRemaining);
90 int bytesRead = connector.read(buf, bufferIndex, bytesRemaining);
92 logger.trace("Received {} bytes from the message", bytesRead);
93 bufferIndex += bytesRead;
95 } else if (readTimeoutCount++ == MAX_READ_TIMEOUTS) {
96 throw new RFXComTimeoutException("Timeout during packet read");