]> git.basschouten.com Git - openhab-addons.git/blob
f388e573bf59949d4089ad40a59f930e1595d308
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.velux.internal.bridge.slip.io;
14
15 import java.io.Closeable;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.net.SocketTimeoutException;
19 import java.util.Arrays;
20 import java.util.NoSuchElementException;
21 import java.util.Queue;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.openhab.binding.velux.internal.handler.VeluxBridgeHandler;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This is an wrapper around {@link java.io.InputStream} to support socket receive operations.
37  *
38  * It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer. And
39  * it parses the bytes into SLIP messages, which are placed on a message queue. Callers can access the SLIP messages in
40  * this queue independently from the polling thread.
41  *
42  * @author Guenther Schreiner - Initial contribution.
43  * @author Andrew Fiddian-Green - Complete rewrite using asynchronous polling thread.
44  */
45 @NonNullByDefault
46 class DataInputStreamWithTimeout implements Closeable {
47
48     private static final int QUEUE_SIZE = 512;
49     private static final int BUFFER_SIZE = 512;
50     private static final int SLEEP_INTERVAL_MSECS = 50;
51
52     // special character that marks the first and last byte of a slip message
53     private static final byte SLIP_MARK = (byte) 0xc0;
54     private static final byte SLIP_PROT = 0;
55
56     private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
57
58     private final Queue<byte[]> slipMessageQueue = new ConcurrentLinkedQueue<>();
59
60     private InputStream inputStream;
61
62     private @Nullable String pollException = null;
63     private @Nullable Poller pollRunner = null;
64     private ExecutorService executor;
65
66     private class Poller implements Callable<Boolean> {
67
68         private boolean interrupted = false;
69         private Future<Boolean> pollerFinished;
70
71         public Poller(ExecutorService executor) {
72             logger.trace("Poller: created");
73             pollerFinished = executor.submit(this);
74         }
75
76         public void interrupt() {
77             interrupted = true;
78             try {
79                 pollerFinished.get();
80             } catch (InterruptedException | ExecutionException e) {
81             }
82         }
83
84         /**
85          * Task that loops to read bytes from {@link InputStream} and build SLIP packets from them. The SLIP packets are
86          * placed in a {@link ConcurrentLinkedQueue}. It loops continuously until 'interrupt()' or 'Thread.interrupt()'
87          * are called when terminates early after the next socket read timeout.
88          */
89         @Override
90         public Boolean call() throws Exception {
91             logger.trace("Poller.call(): started");
92             byte[] buf = new byte[BUFFER_SIZE];
93             int byt;
94             int i = 0;
95
96             // clean start, no exception, empty queue
97             pollException = null;
98             slipMessageQueue.clear();
99
100             // loop forever; on shutdown interrupt() gets called to break out of the loop
101             while (true) {
102                 try {
103                     if (interrupted) {
104                         // fully flush the input buffer
105                         inputStream.readAllBytes();
106                         break;
107                     }
108                     byt = inputStream.read();
109                     if (byt < 0) {
110                         // end of stream is OK => keep on polling
111                         continue;
112                     }
113                     buf[i] = (byte) byt;
114                     if ((i > 0) && (buf[i] == SLIP_MARK)) {
115                         // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
116                         if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
117                             slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
118                             if (slipMessageQueue.size() > QUEUE_SIZE) {
119                                 logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
120                                 slipMessageQueue.poll();
121                             }
122                             i = 0;
123                         } else {
124                             logger.warn("Poller.call(): non slip messsage discarded => PLEASE REPORT !!");
125                             buf[0] = SLIP_MARK;
126                             i = 1;
127                         }
128                         continue;
129                     }
130                     if (++i >= BUFFER_SIZE) {
131                         logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
132                         i = 0;
133                     }
134                 } catch (SocketTimeoutException e) {
135                     // socket read time outs are OK => keep on polling; unless interrupted
136                     if (interrupted) {
137                         break;
138                     }
139                     continue;
140                 } catch (IOException e) {
141                     // any other exception => stop polling
142                     String msg = e.getMessage();
143                     pollException = msg != null ? msg : "Generic IOException";
144                     logger.debug("Poller.call(): stopping '{}'", pollException);
145                     break;
146                 }
147             }
148
149             logger.trace("Poller.call(): ended");
150             // we only get here if shutdown or an error occurs so free ourself so we can be recreated again
151             pollRunner = null;
152             return true;
153         }
154     }
155
156     /**
157      * Check if there was an exception on the polling loop task and if so, throw it back on the caller thread.
158      *
159      * @throws IOException
160      */
161     private void throwIfPollException() throws IOException {
162         if (pollException != null) {
163             logger.debug("passPollException() polling loop exception {}", pollException);
164             throw new IOException(pollException);
165         }
166     }
167
168     /**
169      * Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream}
170      *
171      * @param stream the specified input stream
172      * @param bridge the actual Bridge Thing instance
173      */
174     public DataInputStreamWithTimeout(InputStream stream, VeluxBridgeHandler bridge) {
175         inputStream = stream;
176         executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
177     }
178
179     /**
180      * Overridden method of {@link Closeable} interface. Stops the polling thread.
181      *
182      * @throws IOException
183      */
184     @Override
185     public void close() throws IOException {
186         stopPolling();
187     }
188
189     /**
190      * Reads and removes the next available SLIP message from the queue. If the queue is empty, continue polling
191      * until either a message is found, or the timeout expires.
192      *
193      * @param timeoutMSecs the timeout period in milliseconds.
194      * @return the next SLIP message if there is one on the queue, or any empty byte[] array if not.
195      * @throws IOException
196      */
197     public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException {
198         startPolling();
199         int i = (timeoutMSecs / SLEEP_INTERVAL_MSECS) + 1;
200         while (i-- >= 0) {
201             try {
202                 byte[] slip = slipMessageQueue.remove();
203                 logger.trace("readSlipMessage() => return slip message");
204                 return slip;
205             } catch (NoSuchElementException e) {
206                 // queue empty, wait and continue
207             }
208             throwIfPollException();
209             try {
210                 Thread.sleep(SLEEP_INTERVAL_MSECS);
211             } catch (InterruptedException e) {
212                 logger.debug("readSlipMessage() => thread interrupt");
213             }
214         }
215         logger.debug("readSlipMessage() => no slip message after {}mS => time out", timeoutMSecs);
216         return new byte[0];
217     }
218
219     /**
220      * Get the number of incoming messages in the queue
221      *
222      * @return the number of incoming messages in the queue
223      */
224     public int available() {
225         int size = slipMessageQueue.size();
226         logger.trace("available() => slip message count {}", size);
227         return size;
228     }
229
230     /**
231      * Clear the queue
232      */
233     public void flush() {
234         logger.trace("flush() called");
235         slipMessageQueue.clear();
236     }
237
238     /**
239      * Start the polling task
240      */
241     private void startPolling() {
242         if (pollRunner == null) {
243             logger.trace("startPolling()");
244             pollRunner = new Poller(executor);
245         }
246     }
247
248     /**
249      * Stop the polling task
250      */
251     private void stopPolling() {
252         Poller pollRunner = this.pollRunner;
253         if (pollRunner != null) {
254             logger.trace("stopPolling()");
255             pollRunner.interrupt();
256         }
257         executor.shutdown();
258     }
259 }