2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.velux.internal.bridge.slip.io;
15 import java.io.Closeable;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.util.NoSuchElementException;
19 import java.util.Queue;
20 import java.util.concurrent.ConcurrentLinkedQueue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.velux.internal.handler.VeluxBridgeHandler;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 * This is a wrapper around {@link java.io.InputStream} to support socket receive operations.
37 * It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer. And
38 * it parses the bytes into SLIP messages, which are placed on a message queue. Callers can access the SLIP messages in
39 * this queue independently from the polling thread.
41 * @author Guenther Schreiner - Initial contribution.
42 * @author Andrew Fiddian-Green - Complete rewrite using asynchronous polling thread.
45 class DataInputStreamWithTimeout implements Closeable {
47 private static final int SLEEP_INTERVAL_MSECS = 50;
48 private static final long MAX_WAIT_SECONDS = 15;
50 private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
52 private final Queue<byte[]> slipMessageQueue = new ConcurrentLinkedQueue<>();
53 private final InputStream inputStream;
54 private final VeluxBridgeHandler bridge;
56 private @Nullable Poller poller;
57 private @Nullable Future<Boolean> future;
58 private @Nullable ExecutorService executor;
61 * Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream}
63 * @param inputStream the specified input stream
64 * @param bridge the actual Bridge Thing instance
66 public DataInputStreamWithTimeout(InputStream inputStream, VeluxBridgeHandler bridge) {
67 this.inputStream = inputStream;
72 * Overridden method of {@link Closeable} interface. Stops the polling task.
74 * @throws IOException (although actually no exceptions are thrown)
77 public void close() throws IOException {
82 * Reads and removes the next available SLIP message from the queue. If the queue is empty, continue polling
83 * until either a message is found, or the timeout expires.
85 * @param timeoutMSecs the timeout period in milliseconds.
86 * @return the next SLIP message if there is one on the queue, or any empty byte[] array if not.
87 * @throws IOException if the poller task has unexpectedly terminated e.g. via an IOException, or if either the
88 * poller task, or the calling thread have been interrupted
90 public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException {
92 int i = (timeoutMSecs / SLEEP_INTERVAL_MSECS) + 1;
95 byte[] slip = slipMessageQueue.remove();
96 logger.trace("readSlipMessage() => return slip message");
98 } catch (NoSuchElementException e) {
99 // queue empty, fall through and continue
102 Future<Boolean> future = this.future;
103 if ((future != null) && future.isDone()) {
104 future.get(); // throws ExecutionException, InterruptedException
105 // future terminated without exception, but prematurely, which is itself an exception
106 throw new IOException("Poller thread terminated prematurely");
108 Thread.sleep(SLEEP_INTERVAL_MSECS); // throws InterruptedException
109 } catch (ExecutionException | InterruptedException e) {
110 // re-cast other exceptions as IOException
111 throw new IOException(e);
114 logger.debug("readSlipMessage() => no slip message");
119 * Get the number of incoming messages in the queue
121 * @return the number of incoming messages in the queue
123 public int available() {
124 int size = slipMessageQueue.size();
125 logger.trace("available() => slip message count {}", size);
132 public void flush() {
133 logger.trace("flush() called");
134 slipMessageQueue.clear();
138 * Start the polling task
140 private void startPolling() {
141 if (future == null) {
142 logger.debug("startPolling() called");
143 slipMessageQueue.clear();
144 poller = new Poller(inputStream, slipMessageQueue);
145 ExecutorService executor = this.executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
146 future = executor.submit(poller);
151 * Stop the polling task
153 private void stopPolling() {
154 logger.debug("stopPolling() called");
156 Poller poller = this.poller;
157 Future<Boolean> future = this.future;
158 ExecutorService executor = this.executor;
162 this.executor = null;
164 if (executor != null) {
167 if (poller != null) {
170 if (future != null) {
172 future.get(MAX_WAIT_SECONDS, TimeUnit.SECONDS);
173 } catch (ExecutionException e) {
174 // expected exception due to e.g. IOException on socket close
175 } catch (TimeoutException | InterruptedException e) {
176 // unexpected exception due to e.g. KLF200 'zombie state'
177 logger.warn("stopPolling() exception '{}' => PLEASE REPORT !!", e.getMessage());