2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.velux.internal.bridge.slip.io;
15 import java.io.Closeable;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.net.SocketTimeoutException;
19 import java.util.Arrays;
20 import java.util.NoSuchElementException;
21 import java.util.Queue;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.openhab.binding.velux.internal.handler.VeluxBridgeHandler;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * This is an wrapper around {@link java.io.InputStream} to support socket receive operations.
38 * It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer. And
39 * it parses the bytes into SLIP messages, which are placed on a message queue. Callers can access the SLIP messages in
40 * this queue independently from the polling thread.
42 * @author Guenther Schreiner - Initial contribution.
43 * @author Andrew Fiddian-Green - Complete rewrite using asynchronous polling thread.
46 class DataInputStreamWithTimeout implements Closeable {
48 private static final int QUEUE_SIZE = 512;
49 private static final int BUFFER_SIZE = 512;
50 private static final int SLEEP_INTERVAL_MSECS = 50;
52 // special character that marks the first and last byte of a slip message
53 private static final byte SLIP_MARK = (byte) 0xc0;
54 private static final byte SLIP_PROT = 0;
56 private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
58 private final Queue<byte[]> slipMessageQueue = new ConcurrentLinkedQueue<>();
60 private InputStream inputStream;
62 private @Nullable String pollException = null;
63 private @Nullable Poller pollRunner = null;
64 private ExecutorService executor;
66 private class Poller implements Callable<Boolean> {
68 private boolean interrupted = false;
69 private Future<Boolean> pollerFinished;
71 public Poller(ExecutorService executor) {
72 logger.trace("Poller: created");
73 pollerFinished = executor.submit(this);
76 public void interrupt() {
80 } catch (InterruptedException | ExecutionException e) {
85 * Task that loops to read bytes from {@link InputStream} and build SLIP packets from them. The SLIP packets are
86 * placed in a {@link ConcurrentLinkedQueue}. It loops continuously until 'interrupt()' or 'Thread.interrupt()'
87 * are called when terminates early after the next socket read timeout.
90 public Boolean call() throws Exception {
91 logger.trace("Poller.call(): started");
92 byte[] buf = new byte[BUFFER_SIZE];
96 // clean start, no exception, empty queue
98 slipMessageQueue.clear();
100 // loop forever or until externally interrupted
101 while (!Thread.interrupted()) {
104 // fully flush the input buffer
105 inputStream.readAllBytes();
108 byt = inputStream.read();
110 // end of stream is OK => keep on polling
114 if ((i > 0) && (buf[i] == SLIP_MARK)) {
115 // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
116 if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
117 slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
118 if (slipMessageQueue.size() > QUEUE_SIZE) {
119 logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
120 slipMessageQueue.poll();
124 logger.warn("Poller.call(): non slip messsage discarded => PLEASE REPORT !!");
130 if (++i >= BUFFER_SIZE) {
131 logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
134 } catch (SocketTimeoutException e) {
135 // socket read time outs are OK => keep on polling
137 } catch (IOException e) {
138 // any other exception => stop polling
139 String msg = e.getMessage();
140 pollException = msg != null ? msg : "Generic IOException";
141 logger.debug("Poller.call(): stopping '{}'", pollException);
146 logger.trace("Poller.call(): ended");
147 // we only get here if shutdown or an error occurs so free ourself so we can be recreated again
154 * Check if there was an exception on the polling loop task and if so, throw it back on the caller thread.
156 * @throws IOException
158 private void throwIfPollException() throws IOException {
159 if (pollException != null) {
160 logger.debug("passPollException() polling loop exception {}", pollException);
161 throw new IOException(pollException);
166 * Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream}
168 * @param stream the specified input stream
169 * @param bridge the actual Bridge Thing instance
171 public DataInputStreamWithTimeout(InputStream stream, VeluxBridgeHandler bridge) {
172 inputStream = stream;
173 executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
177 * Overridden method of {@link Closeable} interface. Stops the polling thread.
179 * @throws IOException
182 public void close() throws IOException {
187 * Reads and removes the next available SLIP message from the queue. If the queue is empty, continue polling
188 * until either a message is found, or the timeout expires.
190 * @param timeoutMSecs the timeout period in milliseconds.
191 * @return the next SLIP message if there is one on the queue, or any empty byte[] array if not.
192 * @throws IOException
194 public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException {
196 int i = (timeoutMSecs / SLEEP_INTERVAL_MSECS) + 1;
199 byte[] slip = slipMessageQueue.remove();
200 logger.trace("readSlipMessage() => return slip message");
202 } catch (NoSuchElementException e) {
203 // queue empty, wait and continue
205 throwIfPollException();
207 Thread.sleep(SLEEP_INTERVAL_MSECS);
208 } catch (InterruptedException e) {
209 logger.debug("readSlipMessage() => thread interrupt");
210 throw new IOException("Thread Interrupted");
213 logger.debug("readSlipMessage() => no slip message after {}mS => time out", timeoutMSecs);
218 * Get the number of incoming messages in the queue
220 * @return the number of incoming messages in the queue
222 public int available() {
223 int size = slipMessageQueue.size();
224 logger.trace("available() => slip message count {}", size);
231 public void flush() {
232 logger.trace("flush() called");
233 slipMessageQueue.clear();
237 * Start the polling task
239 private void startPolling() {
240 if (pollRunner == null) {
241 logger.trace("startPolling()");
242 pollRunner = new Poller(executor);
247 * Stop the polling task
249 private void stopPolling() {
250 Poller pollRunner = this.pollRunner;
251 if (pollRunner != null) {
252 logger.trace("stopPolling()");
253 pollRunner.interrupt();