2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.velux.internal.bridge.slip.io;
15 import java.io.Closeable;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.net.SocketTimeoutException;
19 import java.util.Arrays;
20 import java.util.NoSuchElementException;
21 import java.util.Queue;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.openhab.binding.velux.internal.handler.VeluxBridgeHandler;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * This is an wrapper around {@link java.io.InputStream} to support socket receive operations.
36 * It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer. And
37 * it parses the bytes into SLIP messages, which are placed on a message queue. Callers can access the SLIP messages in
38 * this queue independently from the polling thread.
40 * @author Guenther Schreiner - Initial contribution.
41 * @author Andrew Fiddian-Green - Complete rewrite using asynchronous polling thread.
44 class DataInputStreamWithTimeout implements Closeable {
46 private static final int QUEUE_SIZE = 512;
47 private static final int BUFFER_SIZE = 512;
48 private static final int SLEEP_INTERVAL_MSECS = 50;
50 // special character that marks the first and last byte of a slip message
51 private static final byte SLIP_MARK = (byte) 0xc0;
53 private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
55 private final Queue<byte[]> slipMessageQueue = new ConcurrentLinkedQueue<>();
57 private InputStream inputStream;
59 private @Nullable String pollException = null;
60 private @Nullable Poller pollRunner = null;
61 private ExecutorService executor;
63 private class Poller implements Callable<Boolean> {
65 private boolean interrupted = false;
67 public void interrupt() {
72 * Task that loops to read bytes from {@link InputStream} and build SLIP packets from them. The SLIP packets are
73 * placed in a {@link ConcurrentLinkedQueue}. It loops continuously until 'interrupt()' or 'Thread.interrupt()'
74 * are called when terminates early after the next socket read timeout.
77 public Boolean call() throws Exception {
78 byte[] buf = new byte[BUFFER_SIZE];
82 // clean start, no exception, empty queue
84 slipMessageQueue.clear();
86 // loop forever or until internally or externally interrupted
87 while ((!interrupted) && (!Thread.interrupted())) {
89 buf[i] = byt = (byte) inputStream.read();
90 if (byt == SLIP_MARK) {
92 // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
93 if ((i > 5) && (buf[0] == SLIP_MARK)) {
94 slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
95 if (slipMessageQueue.size() > QUEUE_SIZE) {
96 logger.warn("pollRunner() => slip message queue overflow => PLEASE REPORT !!");
97 slipMessageQueue.poll();
105 if (++i >= BUFFER_SIZE) {
108 } catch (SocketTimeoutException e) {
109 // socket read time outs are OK => keep on polling
111 } catch (IOException e) {
112 // any other exception => stop polling
113 String msg = e.getMessage();
114 pollException = msg != null ? msg : "Generic IOException";
115 logger.debug("pollRunner() stopping '{}'", pollException);
120 // we only get here if shutdown or an error occurs so free ourself so we can be recreated again
127 * Check if there was an exception on the polling loop task and if so, throw it back on the caller thread.
129 * @throws IOException
131 private void throwIfPollException() throws IOException {
132 if (pollException != null) {
133 logger.debug("passPollException() polling loop exception {}", pollException);
134 throw new IOException(pollException);
139 * Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream}
141 * @param stream the specified input stream
142 * @param bridge the actual Bridge Thing instance
144 public DataInputStreamWithTimeout(InputStream stream, VeluxBridgeHandler bridge) {
145 inputStream = stream;
146 executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
150 * Overridden method of {@link Closeable} interface. Stops the polling thread.
152 * @throws IOException
155 public void close() throws IOException {
160 * Reads and removes the next available SLIP message from the queue. If the queue is empty, continue polling
161 * until either a message is found, or the timeout expires.
163 * @param timeoutMSecs the timeout period in milliseconds.
164 * @return the next SLIP message if there is one on the queue, or any empty byte[] array if not.
165 * @throws IOException
167 public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException {
169 int i = (timeoutMSecs / SLEEP_INTERVAL_MSECS) + 1;
172 byte[] slip = slipMessageQueue.remove();
173 logger.trace("readSlipMessage() => return slip message");
175 } catch (NoSuchElementException e) {
176 // queue empty, wait and continue
178 throwIfPollException();
180 Thread.sleep(SLEEP_INTERVAL_MSECS);
181 } catch (InterruptedException e) {
182 logger.debug("readSlipMessage() => thread interrupt");
183 throw new IOException("Thread Interrupted");
186 logger.debug("readSlipMessage() => no slip message after {}mS => time out", timeoutMSecs);
191 * Get the number of incoming messages in the queue
193 * @return the number of incoming messages in the queue
195 public int available() {
196 int size = slipMessageQueue.size();
197 logger.trace("available() => slip message count {}", size);
204 public void flush() {
205 logger.trace("flush() called");
206 slipMessageQueue.clear();
210 * Start the polling task
212 private void startPolling() {
213 Poller pollRunner = this.pollRunner;
214 if (pollRunner == null) {
215 logger.trace("startPolling()");
216 pollRunner = this.pollRunner = new Poller();
217 executor.submit(pollRunner);
222 * Stop the polling task
224 private void stopPolling() {
225 Poller pollRunner = this.pollRunner;
226 if (pollRunner != null) {
227 logger.trace("stopPolling()");
228 pollRunner.interrupt();
229 this.pollRunner = null;