]> git.basschouten.com Git - openhab-addons.git/blob
6dfa497b844dd591f1d7f8a47795041a7994eccb
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.velux.internal.bridge.slip.io;
14
15 import java.io.Closeable;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.util.NoSuchElementException;
19 import java.util.Queue;
20 import java.util.concurrent.ConcurrentLinkedQueue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.velux.internal.handler.VeluxBridgeHandler;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * This is a wrapper around {@link java.io.InputStream} to support socket receive operations.
36  *
37  * It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer. And
38  * it parses the bytes into SLIP messages, which are placed on a message queue. Callers can access the SLIP messages in
39  * this queue independently from the polling thread.
40  *
41  * @author Guenther Schreiner - Initial contribution.
42  * @author Andrew Fiddian-Green - Complete rewrite using asynchronous polling thread.
43  */
44 @NonNullByDefault
45 class DataInputStreamWithTimeout implements Closeable {
46
47     private static final int SLEEP_INTERVAL_MSECS = 50;
48     private static final long MAX_WAIT_SECONDS = 15;
49
50     private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
51
52     private final Queue<byte[]> slipMessageQueue = new ConcurrentLinkedQueue<>();
53     private final InputStream inputStream;
54     private final VeluxBridgeHandler bridge;
55
56     private @Nullable Poller poller;
57     private @Nullable Future<Boolean> future;
58     private @Nullable ExecutorService executor;
59
60     /**
61      * Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream}
62      *
63      * @param inputStream the specified input stream
64      * @param bridge the actual Bridge Thing instance
65      */
66     public DataInputStreamWithTimeout(InputStream inputStream, VeluxBridgeHandler bridge) {
67         this.inputStream = inputStream;
68         this.bridge = bridge;
69     }
70
71     /**
72      * Overridden method of {@link Closeable} interface. Stops the polling task.
73      *
74      * @throws IOException (although actually no exceptions are thrown)
75      */
76     @Override
77     public void close() throws IOException {
78         stopPolling();
79     }
80
81     /**
82      * Reads and removes the next available SLIP message from the queue. If the queue is empty, continue polling
83      * until either a message is found, or the timeout expires.
84      *
85      * @param timeoutMSecs the timeout period in milliseconds.
86      * @return the next SLIP message if there is one on the queue, or any empty byte[] array if not.
87      * @throws IOException if the poller task has unexpectedly terminated e.g. via an IOException, or if either the
88      *             poller task, or the calling thread have been interrupted
89      */
90     public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException {
91         startPolling();
92         int i = (timeoutMSecs / SLEEP_INTERVAL_MSECS) + 1;
93         while (i-- >= 0) {
94             try {
95                 byte[] slip = slipMessageQueue.remove();
96                 logger.trace("readSlipMessage() => return slip message");
97                 return slip;
98             } catch (NoSuchElementException e) {
99                 // queue empty, fall through and continue
100             }
101             try {
102                 Future<Boolean> future = this.future;
103                 if ((future != null) && future.isDone()) {
104                     future.get(); // throws ExecutionException, InterruptedException
105                     // future terminated without exception, but prematurely, which is itself an exception
106                     throw new IOException("Poller thread terminated prematurely");
107                 }
108                 Thread.sleep(SLEEP_INTERVAL_MSECS); // throws InterruptedException
109             } catch (ExecutionException | InterruptedException e) {
110                 // re-cast other exceptions as IOException
111                 throw new IOException(e);
112             }
113         }
114         logger.debug("readSlipMessage() => no slip message");
115         return new byte[0];
116     }
117
118     /**
119      * Get the number of incoming messages in the queue
120      *
121      * @return the number of incoming messages in the queue
122      */
123     public int available() {
124         int size = slipMessageQueue.size();
125         logger.trace("available() => slip message count {}", size);
126         return size;
127     }
128
129     /**
130      * Clear the queue
131      */
132     public void flush() {
133         logger.trace("flush() called");
134         slipMessageQueue.clear();
135     }
136
137     /**
138      * Start the polling task
139      */
140     private void startPolling() {
141         if (future == null) {
142             logger.debug("startPolling() called");
143             slipMessageQueue.clear();
144             poller = new Poller(inputStream, slipMessageQueue);
145             ExecutorService executor = this.executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
146             future = executor.submit(poller);
147         }
148     }
149
150     /**
151      * Stop the polling task
152      */
153     private void stopPolling() {
154         logger.debug("stopPolling() called");
155
156         Poller poller = this.poller;
157         Future<Boolean> future = this.future;
158         ExecutorService executor = this.executor;
159
160         this.poller = null;
161         this.future = null;
162         this.executor = null;
163
164         if (executor != null) {
165             executor.shutdown();
166         }
167         if (poller != null) {
168             poller.interrupt();
169         }
170         if (future != null) {
171             try {
172                 future.get(MAX_WAIT_SECONDS, TimeUnit.SECONDS);
173             } catch (ExecutionException e) {
174                 // expected exception due to e.g. IOException on socket close
175             } catch (TimeoutException | InterruptedException e) {
176                 // unexpected exception due to e.g. KLF200 'zombie state'
177                 logger.warn("stopPolling() exception '{}' => PLEASE REPORT !!", e.getMessage());
178             }
179         }
180     }
181 }