]> git.basschouten.com Git - openhab-addons.git/blob
230c9170629389fd5a1c191c72f4edf88d6f71ae
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.velux.internal.bridge.slip.io;
14
15 import java.io.Closeable;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.net.SocketTimeoutException;
19 import java.util.Arrays;
20 import java.util.NoSuchElementException;
21 import java.util.Queue;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.openhab.binding.velux.internal.handler.VeluxBridgeHandler;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This is an wrapper around {@link java.io.InputStream} to support socket receive operations.
37  *
38  * It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer. And
39  * it parses the bytes into SLIP messages, which are placed on a message queue. Callers can access the SLIP messages in
40  * this queue independently from the polling thread.
41  *
42  * @author Guenther Schreiner - Initial contribution.
43  * @author Andrew Fiddian-Green - Complete rewrite using asynchronous polling thread.
44  */
45 @NonNullByDefault
46 class DataInputStreamWithTimeout implements Closeable {
47
48     private static final int QUEUE_SIZE = 512;
49     private static final int BUFFER_SIZE = 512;
50     private static final int SLEEP_INTERVAL_MSECS = 50;
51
52     // special character that marks the first and last byte of a slip message
53     private static final byte SLIP_MARK = (byte) 0xc0;
54     private static final byte SLIP_PROT = 0;
55
56     private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
57
58     private final Queue<byte[]> slipMessageQueue = new ConcurrentLinkedQueue<>();
59
60     private InputStream inputStream;
61
62     private @Nullable String pollException = null;
63     private @Nullable Poller pollRunner = null;
64     private ExecutorService executor;
65
66     private class Poller implements Callable<Boolean> {
67
68         private boolean interrupted = false;
69         private Future<Boolean> pollerFinished;
70
71         public Poller(ExecutorService executor) {
72             logger.trace("Poller: created");
73             pollerFinished = executor.submit(this);
74         }
75
76         public void interrupt() {
77             interrupted = true;
78             try {
79                 pollerFinished.get();
80             } catch (InterruptedException | ExecutionException e) {
81             }
82         }
83
84         /**
85          * Task that loops to read bytes from {@link InputStream} and build SLIP packets from them. The SLIP packets are
86          * placed in a {@link ConcurrentLinkedQueue}. It loops continuously until 'interrupt()' or 'Thread.interrupt()'
87          * are called when terminates early after the next socket read timeout.
88          */
89         @Override
90         public Boolean call() throws Exception {
91             logger.trace("Poller.call(): started");
92             byte[] buf = new byte[BUFFER_SIZE];
93             int byt;
94             int i = 0;
95
96             // clean start, no exception, empty queue
97             pollException = null;
98             slipMessageQueue.clear();
99
100             // loop forever or until externally interrupted
101             while (!Thread.interrupted()) {
102                 try {
103                     if (interrupted) {
104                         // fully flush the input buffer
105                         inputStream.readAllBytes();
106                         break;
107                     }
108                     byt = inputStream.read();
109                     if (byt < 0) {
110                         // end of stream is OK => keep on polling
111                         continue;
112                     }
113                     buf[i] = (byte) byt;
114                     if ((i > 0) && (buf[i] == SLIP_MARK)) {
115                         // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
116                         if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
117                             slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
118                             if (slipMessageQueue.size() > QUEUE_SIZE) {
119                                 logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
120                                 slipMessageQueue.poll();
121                             }
122                             i = 0;
123                         } else {
124                             logger.warn("Poller.call(): non slip messsage discarded => PLEASE REPORT !!");
125                             buf[0] = SLIP_MARK;
126                             i = 1;
127                         }
128                         continue;
129                     }
130                     if (++i >= BUFFER_SIZE) {
131                         logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
132                         i = 0;
133                     }
134                 } catch (SocketTimeoutException e) {
135                     // socket read time outs are OK => keep on polling
136                     continue;
137                 } catch (IOException e) {
138                     // any other exception => stop polling
139                     String msg = e.getMessage();
140                     pollException = msg != null ? msg : "Generic IOException";
141                     logger.debug("Poller.call(): stopping '{}'", pollException);
142                     break;
143                 }
144             }
145
146             logger.trace("Poller.call(): ended");
147             // we only get here if shutdown or an error occurs so free ourself so we can be recreated again
148             pollRunner = null;
149             return true;
150         }
151     }
152
153     /**
154      * Check if there was an exception on the polling loop task and if so, throw it back on the caller thread.
155      *
156      * @throws IOException
157      */
158     private void throwIfPollException() throws IOException {
159         if (pollException != null) {
160             logger.debug("passPollException() polling loop exception {}", pollException);
161             throw new IOException(pollException);
162         }
163     }
164
165     /**
166      * Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream}
167      *
168      * @param stream the specified input stream
169      * @param bridge the actual Bridge Thing instance
170      */
171     public DataInputStreamWithTimeout(InputStream stream, VeluxBridgeHandler bridge) {
172         inputStream = stream;
173         executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
174     }
175
176     /**
177      * Overridden method of {@link Closeable} interface. Stops the polling thread.
178      *
179      * @throws IOException
180      */
181     @Override
182     public void close() throws IOException {
183         stopPolling();
184     }
185
186     /**
187      * Reads and removes the next available SLIP message from the queue. If the queue is empty, continue polling
188      * until either a message is found, or the timeout expires.
189      *
190      * @param timeoutMSecs the timeout period in milliseconds.
191      * @return the next SLIP message if there is one on the queue, or any empty byte[] array if not.
192      * @throws IOException
193      */
194     public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException {
195         startPolling();
196         int i = (timeoutMSecs / SLEEP_INTERVAL_MSECS) + 1;
197         while (i-- >= 0) {
198             try {
199                 byte[] slip = slipMessageQueue.remove();
200                 logger.trace("readSlipMessage() => return slip message");
201                 return slip;
202             } catch (NoSuchElementException e) {
203                 // queue empty, wait and continue
204             }
205             throwIfPollException();
206             try {
207                 Thread.sleep(SLEEP_INTERVAL_MSECS);
208             } catch (InterruptedException e) {
209                 logger.debug("readSlipMessage() => thread interrupt");
210                 throw new IOException("Thread Interrupted");
211             }
212         }
213         logger.debug("readSlipMessage() => no slip message after {}mS => time out", timeoutMSecs);
214         return new byte[0];
215     }
216
217     /**
218      * Get the number of incoming messages in the queue
219      *
220      * @return the number of incoming messages in the queue
221      */
222     public int available() {
223         int size = slipMessageQueue.size();
224         logger.trace("available() => slip message count {}", size);
225         return size;
226     }
227
228     /**
229      * Clear the queue
230      */
231     public void flush() {
232         logger.trace("flush() called");
233         slipMessageQueue.clear();
234     }
235
236     /**
237      * Start the polling task
238      */
239     private void startPolling() {
240         if (pollRunner == null) {
241             logger.trace("startPolling()");
242             pollRunner = new Poller(executor);
243         }
244     }
245
246     /**
247      * Stop the polling task
248      */
249     private void stopPolling() {
250         Poller pollRunner = this.pollRunner;
251         if (pollRunner != null) {
252             logger.trace("stopPolling()");
253             pollRunner.interrupt();
254         }
255         executor.shutdown();
256     }
257 }