]> git.basschouten.com Git - openhab-addons.git/blob
4116cfd6bb6e8a4ebf2bc0305108a13c550afd3c
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.velux.internal.bridge.slip.io;
14
15 import java.io.Closeable;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.net.SocketTimeoutException;
19 import java.util.Arrays;
20 import java.util.NoSuchElementException;
21 import java.util.Queue;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.openhab.binding.velux.internal.handler.VeluxBridgeHandler;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * This is an wrapper around {@link java.io.InputStream} to support socket receive operations.
35  *
36  * It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer. And
37  * it parses the bytes into SLIP messages, which are placed on a message queue. Callers can access the SLIP messages in
38  * this queue independently from the polling thread.
39  *
40  * @author Guenther Schreiner - Initial contribution.
41  * @author Andrew Fiddian-Green - Complete rewrite using asynchronous polling thread.
42  */
43 @NonNullByDefault
44 class DataInputStreamWithTimeout implements Closeable {
45
46     private static final int QUEUE_SIZE = 512;
47     private static final int BUFFER_SIZE = 512;
48     private static final int SLEEP_INTERVAL_MSECS = 50;
49
50     // special character that marks the first and last byte of a slip message
51     private static final byte SLIP_MARK = (byte) 0xc0;
52
53     private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
54
55     private final Queue<byte[]> slipMessageQueue = new ConcurrentLinkedQueue<>();
56
57     private InputStream inputStream;
58
59     private @Nullable String pollException = null;
60     private @Nullable Poller pollRunner = null;
61     private ExecutorService executor;
62
63     private class Poller implements Callable<Boolean> {
64
65         private boolean interrupted = false;
66
67         public void interrupt() {
68             interrupted = true;
69         }
70
71         /**
72          * Task that loops to read bytes from {@link InputStream} and build SLIP packets from them. The SLIP packets are
73          * placed in a {@link ConcurrentLinkedQueue}. It loops continuously until 'interrupt()' or 'Thread.interrupt()'
74          * are called when terminates early after the next socket read timeout.
75          */
76         @Override
77         public Boolean call() throws Exception {
78             byte[] buf = new byte[BUFFER_SIZE];
79             byte byt;
80             int i = 0;
81
82             // clean start, no exception, empty queue
83             pollException = null;
84             slipMessageQueue.clear();
85
86             // loop forever or until internally or externally interrupted
87             while ((!interrupted) && (!Thread.interrupted())) {
88                 try {
89                     buf[i] = byt = (byte) inputStream.read();
90                     if (byt == SLIP_MARK) {
91                         if (i > 0) {
92                             // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
93                             if ((i > 5) && (buf[0] == SLIP_MARK)) {
94                                 slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
95                                 if (slipMessageQueue.size() > QUEUE_SIZE) {
96                                     logger.warn("pollRunner() => slip message queue overflow => PLEASE REPORT !!");
97                                     slipMessageQueue.poll();
98                                 }
99                             }
100                             i = 0;
101                             buf[0] = SLIP_MARK;
102                             continue;
103                         }
104                     }
105                     if (++i >= BUFFER_SIZE) {
106                         i = 0;
107                     }
108                 } catch (SocketTimeoutException e) {
109                     // socket read time outs are OK => keep on polling
110                     continue;
111                 } catch (IOException e) {
112                     // any other exception => stop polling
113                     String msg = e.getMessage();
114                     pollException = msg != null ? msg : "Generic IOException";
115                     logger.debug("pollRunner() stopping '{}'", pollException);
116                     break;
117                 }
118             }
119
120             // we only get here if shutdown or an error occurs so free ourself so we can be recreated again
121             pollRunner = null;
122             return true;
123         }
124     }
125
126     /**
127      * Check if there was an exception on the polling loop task and if so, throw it back on the caller thread.
128      *
129      * @throws IOException
130      */
131     private void throwIfPollException() throws IOException {
132         if (pollException != null) {
133             logger.debug("passPollException() polling loop exception {}", pollException);
134             throw new IOException(pollException);
135         }
136     }
137
138     /**
139      * Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream}
140      *
141      * @param stream the specified input stream
142      * @param bridge the actual Bridge Thing instance
143      */
144     public DataInputStreamWithTimeout(InputStream stream, VeluxBridgeHandler bridge) {
145         inputStream = stream;
146         executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
147     }
148
149     /**
150      * Overridden method of {@link Closeable} interface. Stops the polling thread.
151      *
152      * @throws IOException
153      */
154     @Override
155     public void close() throws IOException {
156         stopPolling();
157     }
158
159     /**
160      * Reads and removes the next available SLIP message from the queue. If the queue is empty, continue polling
161      * until either a message is found, or the timeout expires.
162      *
163      * @param timeoutMSecs the timeout period in milliseconds.
164      * @return the next SLIP message if there is one on the queue, or any empty byte[] array if not.
165      * @throws IOException
166      */
167     public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException {
168         startPolling();
169         int i = (timeoutMSecs / SLEEP_INTERVAL_MSECS) + 1;
170         while (i-- >= 0) {
171             try {
172                 byte[] slip = slipMessageQueue.remove();
173                 logger.trace("readSlipMessage() => return slip message");
174                 return slip;
175             } catch (NoSuchElementException e) {
176                 // queue empty, wait and continue
177             }
178             throwIfPollException();
179             try {
180                 Thread.sleep(SLEEP_INTERVAL_MSECS);
181             } catch (InterruptedException e) {
182                 logger.debug("readSlipMessage() => thread interrupt");
183                 throw new IOException("Thread Interrupted");
184             }
185         }
186         logger.debug("readSlipMessage() => no slip message after {}mS => time out", timeoutMSecs);
187         return new byte[0];
188     }
189
190     /**
191      * Get the number of incoming messages in the queue
192      *
193      * @return the number of incoming messages in the queue
194      */
195     public int available() {
196         int size = slipMessageQueue.size();
197         logger.trace("available() => slip message count {}", size);
198         return size;
199     }
200
201     /**
202      * Clear the queue
203      */
204     public void flush() {
205         logger.trace("flush() called");
206         slipMessageQueue.clear();
207     }
208
209     /**
210      * Start the polling task
211      */
212     private void startPolling() {
213         Poller pollRunner = this.pollRunner;
214         if (pollRunner == null) {
215             logger.trace("startPolling()");
216             pollRunner = this.pollRunner = new Poller();
217             executor.submit(pollRunner);
218         }
219     }
220
221     /**
222      * Stop the polling task
223      */
224     private void stopPolling() {
225         Poller pollRunner = this.pollRunner;
226         if (pollRunner != null) {
227             logger.trace("stopPolling()");
228             pollRunner.interrupt();
229             this.pollRunner = null;
230         }
231         executor.shutdown();
232     }
233 }