2 * Copyright (c) 2010-2022 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.velux.internal.bridge.slip.io;
15 import java.io.Closeable;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.net.SocketTimeoutException;
19 import java.util.Arrays;
20 import java.util.NoSuchElementException;
21 import java.util.Queue;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.openhab.binding.velux.internal.handler.VeluxBridgeHandler;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * This is an wrapper around {@link java.io.InputStream} to support socket receive operations.
38 * It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer. And
39 * it parses the bytes into SLIP messages, which are placed on a message queue. Callers can access the SLIP messages in
40 * this queue independently from the polling thread.
42 * @author Guenther Schreiner - Initial contribution.
43 * @author Andrew Fiddian-Green - Complete rewrite using asynchronous polling thread.
46 class DataInputStreamWithTimeout implements Closeable {
48 private static final int QUEUE_SIZE = 512;
49 private static final int BUFFER_SIZE = 512;
50 private static final int SLEEP_INTERVAL_MSECS = 50;
52 // special character that marks the first and last byte of a slip message
53 private static final byte SLIP_MARK = (byte) 0xc0;
54 private static final byte SLIP_PROT = 0;
56 private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
58 private final Queue<byte[]> slipMessageQueue = new ConcurrentLinkedQueue<>();
60 private InputStream inputStream;
62 private @Nullable String pollException = null;
63 private @Nullable Poller pollRunner = null;
64 private ExecutorService executor;
66 private class Poller implements Callable<Boolean> {
68 private boolean interrupted = false;
69 private Future<Boolean> pollerFinished;
71 public Poller(ExecutorService executor) {
72 logger.trace("Poller: created");
73 pollerFinished = executor.submit(this);
76 public void interrupt() {
80 } catch (InterruptedException | ExecutionException e) {
85 * Task that loops to read bytes from {@link InputStream} and build SLIP packets from them. The SLIP packets are
86 * placed in a {@link ConcurrentLinkedQueue}. It loops continuously until 'interrupt()' or 'Thread.interrupt()'
87 * are called when terminates early after the next socket read timeout.
90 public Boolean call() throws Exception {
91 logger.trace("Poller.call(): started");
92 byte[] buf = new byte[BUFFER_SIZE];
96 // clean start, no exception, empty queue
98 slipMessageQueue.clear();
100 // loop forever; on shutdown interrupt() gets called to break out of the loop
104 // fully flush the input buffer
105 inputStream.readAllBytes();
108 byt = inputStream.read();
110 // end of stream is OK => keep on polling
114 if ((i > 0) && (buf[i] == SLIP_MARK)) {
115 // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
116 if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
117 slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
118 if (slipMessageQueue.size() > QUEUE_SIZE) {
119 logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
120 slipMessageQueue.poll();
124 logger.warn("Poller.call(): non slip messsage discarded => PLEASE REPORT !!");
130 if (++i >= BUFFER_SIZE) {
131 logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
134 } catch (SocketTimeoutException e) {
135 // socket read time outs are OK => keep on polling; unless interrupted
140 } catch (IOException e) {
141 // any other exception => stop polling
142 String msg = e.getMessage();
143 pollException = msg != null ? msg : "Generic IOException";
144 logger.debug("Poller.call(): stopping '{}'", pollException);
149 logger.trace("Poller.call(): ended");
150 // we only get here if shutdown or an error occurs so free ourself so we can be recreated again
157 * Check if there was an exception on the polling loop task and if so, throw it back on the caller thread.
159 * @throws IOException
161 private void throwIfPollException() throws IOException {
162 if (pollException != null) {
163 logger.debug("passPollException() polling loop exception {}", pollException);
164 throw new IOException(pollException);
169 * Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream}
171 * @param stream the specified input stream
172 * @param bridge the actual Bridge Thing instance
174 public DataInputStreamWithTimeout(InputStream stream, VeluxBridgeHandler bridge) {
175 inputStream = stream;
176 executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
180 * Overridden method of {@link Closeable} interface. Stops the polling thread.
182 * @throws IOException
185 public void close() throws IOException {
190 * Reads and removes the next available SLIP message from the queue. If the queue is empty, continue polling
191 * until either a message is found, or the timeout expires.
193 * @param timeoutMSecs the timeout period in milliseconds.
194 * @return the next SLIP message if there is one on the queue, or any empty byte[] array if not.
195 * @throws IOException
197 public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException {
199 int i = (timeoutMSecs / SLEEP_INTERVAL_MSECS) + 1;
202 byte[] slip = slipMessageQueue.remove();
203 logger.trace("readSlipMessage() => return slip message");
205 } catch (NoSuchElementException e) {
206 // queue empty, wait and continue
208 throwIfPollException();
210 Thread.sleep(SLEEP_INTERVAL_MSECS);
211 } catch (InterruptedException e) {
212 logger.debug("readSlipMessage() => thread interrupt");
215 logger.debug("readSlipMessage() => no slip message after {}mS => time out", timeoutMSecs);
220 * Get the number of incoming messages in the queue
222 * @return the number of incoming messages in the queue
224 public int available() {
225 int size = slipMessageQueue.size();
226 logger.trace("available() => slip message count {}", size);
233 public void flush() {
234 logger.trace("flush() called");
235 slipMessageQueue.clear();
239 * Start the polling task
241 private void startPolling() {
242 if (pollRunner == null) {
243 logger.trace("startPolling()");
244 pollRunner = new Poller(executor);
249 * Stop the polling task
251 private void stopPolling() {
252 Poller pollRunner = this.pollRunner;
253 if (pollRunner != null) {
254 logger.trace("stopPolling()");
255 pollRunner.interrupt();