]> git.basschouten.com Git - openhab-addons.git/blob
b7774e6671013a044eda18da24a3b22cc443e180
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.rfxcom.internal.connector;
14
15 import java.io.IOException;
16 import java.util.Arrays;
17
18 import org.openhab.binding.rfxcom.internal.exceptions.RFXComTimeoutException;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 /**
23  * RFXCOM stream reader to parse RFXCOM output into messages.
24  *
25  * @author Pauli Anttila - Initial contribution
26  * @author James Hewitt-Thomas - New class
27  * @author Mike Jagdis - Interruptible read loop
28  * @author Martin van Wingerden - Slight refactoring for read loop for TCP connector
29  */
30 public class RFXComStreamReader extends Thread {
31     private final Logger logger = LoggerFactory.getLogger(RFXComStreamReader.class);
32     private static final int MAX_READ_TIMEOUTS = 4;
33     private static final int MAX_RFXCOM_MESSAGE_LEN = 256;
34
35     private RFXComBaseConnector connector;
36
37     private class ExceptionHandler implements Thread.UncaughtExceptionHandler {
38         @Override
39         public void uncaughtException(Thread thread, Throwable throwable) {
40             logger.debug("Connector died: ", throwable);
41             connector.sendErrorToListeners("Connector died: " + throwable.getMessage());
42         }
43     }
44
45     public RFXComStreamReader(RFXComBaseConnector connector, String threadName) {
46         super(threadName);
47         this.connector = connector;
48         setUncaughtExceptionHandler(new ExceptionHandler());
49     }
50
51     @Override
52     public void run() {
53         logger.debug("Data listener started");
54         byte[] buf = new byte[MAX_RFXCOM_MESSAGE_LEN];
55
56         // The stream has (or SHOULD have) a read timeout set. Taking a
57         // read timeout (read returns 0) between packets gives us a chance
58         // to check if we've been interrupted. Read interrupts during a
59         // packet are ignored but if too many timeouts occur we take it as
60         // meaning the RFXCOM has become missing presumed dead.
61         try {
62             while (!Thread.interrupted()) {
63                 // First byte tells us how long the packet is
64                 int bytesRead = connector.read(buf, 0, 1);
65                 int packetLength = buf[0];
66
67                 if (bytesRead > 0 && packetLength > 0) {
68                     logger.trace("Message length is {} bytes", packetLength);
69                     processMessage(buf, packetLength);
70                     connector.sendMsgToListeners(Arrays.copyOfRange(buf, 0, packetLength + 1));
71                 } else if (bytesRead == -1) {
72                     throw new IOException("End of stream");
73                 }
74             }
75         } catch (IOException | RFXComTimeoutException e) {
76             logger.debug("Received exception, will report it to listeners", e);
77             connector.sendErrorToListeners(e.getMessage());
78         }
79
80         logger.debug("Data listener stopped");
81     }
82
83     private void processMessage(byte[] buf, int packetLength) throws IOException, RFXComTimeoutException {
84         // Now read the rest of the packet
85         int bufferIndex = 1;
86         int readTimeoutCount = 1;
87         while (bufferIndex <= packetLength) {
88             int bytesRemaining = packetLength - bufferIndex + 1;
89             logger.trace("Waiting remaining {} bytes from the message", bytesRemaining);
90             int bytesRead = connector.read(buf, bufferIndex, bytesRemaining);
91             if (bytesRead > 0) {
92                 logger.trace("Received {} bytes from the message", bytesRead);
93                 bufferIndex += bytesRead;
94                 readTimeoutCount = 1;
95             } else if (readTimeoutCount++ == MAX_READ_TIMEOUTS) {
96                 throw new RFXComTimeoutException("Timeout during packet read");
97             }
98         }
99     }
100 }