2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.rfxcom.internal.connector;
15 import static org.openhab.binding.rfxcom.internal.RFXComBindingConstants.MAX_RFXCOM_MESSAGE_LEN;
17 import java.io.IOException;
18 import java.util.Arrays;
20 import org.openhab.binding.rfxcom.internal.exceptions.RFXComTimeoutException;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
25 * RFXCOM stream reader to parse RFXCOM output into messages.
27 * @author Pauli Anttila - Initial contribution
28 * @author James Hewitt-Thomas - New class
29 * @author Mike Jagdis - Interruptible read loop
30 * @author Martin van Wingerden - Slight refactoring for read loop for TCP connector
32 public class RFXComStreamReader extends Thread {
33 private final Logger logger = LoggerFactory.getLogger(RFXComStreamReader.class);
34 private static final int MAX_READ_TIMEOUTS = 4;
36 private RFXComBaseConnector connector;
38 private class ExceptionHandler implements Thread.UncaughtExceptionHandler {
40 public void uncaughtException(Thread thread, Throwable throwable) {
41 logger.debug("Connector died: ", throwable);
42 connector.sendErrorToListeners("Connector died: " + throwable.getMessage());
46 public RFXComStreamReader(RFXComBaseConnector connector, String threadName) {
48 this.connector = connector;
49 setUncaughtExceptionHandler(new ExceptionHandler());
54 logger.debug("Data listener started");
55 byte[] buf = new byte[MAX_RFXCOM_MESSAGE_LEN];
57 // The stream has (or SHOULD have) a read timeout set. Taking a
58 // read timeout (read returns 0) between packets gives us a chance
59 // to check if we've been interrupted. Read interrupts during a
60 // packet are ignored but if too many timeouts occur we take it as
61 // meaning the RFXCOM has become missing presumed dead.
63 while (!Thread.interrupted()) {
64 // First byte tells us how long the packet is
65 int bytesRead = connector.read(buf, 0, 1);
66 int packetLength = Byte.toUnsignedInt(buf[0]);
68 if (bytesRead > 0 && packetLength > 0) {
69 logger.trace("Message length is {} bytes", packetLength);
70 processMessage(buf, packetLength);
71 connector.sendMsgToListeners(Arrays.copyOfRange(buf, 0, packetLength + 1));
72 } else if (bytesRead == -1) {
73 throw new IOException("End of stream");
76 } catch (IOException | RFXComTimeoutException e) {
77 logger.debug("Received exception, will report it to listeners", e);
78 connector.sendErrorToListeners(e.getMessage());
81 logger.debug("Data listener stopped");
84 private void processMessage(byte[] buf, int packetLength) throws IOException, RFXComTimeoutException {
85 // Now read the rest of the packet
87 int readTimeoutCount = 1;
88 while (bufferIndex <= packetLength) {
89 int bytesRemaining = packetLength - bufferIndex + 1;
90 logger.trace("Waiting remaining {} bytes from the message", bytesRemaining);
91 int bytesRead = connector.read(buf, bufferIndex, bytesRemaining);
93 logger.trace("Received {} bytes from the message", bytesRead);
94 bufferIndex += bytesRead;
96 } else if (readTimeoutCount++ == MAX_READ_TIMEOUTS) {
97 throw new RFXComTimeoutException("Timeout during packet read");