import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
// special character that marks the first and last byte of a slip message
private static final byte SLIP_MARK = (byte) 0xc0;
+ private static final byte SLIP_PROT = 0;
private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
private class Poller implements Callable<Boolean> {
private boolean interrupted = false;
+ private Future<Boolean> pollerFinished;
+
+ public Poller(ExecutorService executor) {
+ logger.trace("Poller: created");
+ pollerFinished = executor.submit(this);
+ }
public void interrupt() {
interrupted = true;
+ try {
+ pollerFinished.get();
+ } catch (InterruptedException | ExecutionException e) {
+ }
}
/**
*/
@Override
public Boolean call() throws Exception {
+ logger.trace("Poller.call(): started");
byte[] buf = new byte[BUFFER_SIZE];
- byte byt;
+ int byt;
int i = 0;
// clean start, no exception, empty queue
pollException = null;
slipMessageQueue.clear();
- // loop forever or until internally or externally interrupted
- while ((!interrupted) && (!Thread.interrupted())) {
+ // loop forever or until externally interrupted
+ while (!Thread.interrupted()) {
try {
- buf[i] = byt = (byte) inputStream.read();
- if (byt == SLIP_MARK) {
- if (i > 0) {
- // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
- if ((i > 5) && (buf[0] == SLIP_MARK)) {
- slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
- if (slipMessageQueue.size() > QUEUE_SIZE) {
- logger.warn("pollRunner() => slip message queue overflow => PLEASE REPORT !!");
- slipMessageQueue.poll();
- }
+ if (interrupted) {
+ // fully flush the input buffer
+ inputStream.readAllBytes();
+ break;
+ }
+ byt = inputStream.read();
+ if (byt < 0) {
+ // end of stream is OK => keep on polling
+ continue;
+ }
+ buf[i] = (byte) byt;
+ if ((i > 0) && (buf[i] == SLIP_MARK)) {
+ // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
+ if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
+ slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
+ if (slipMessageQueue.size() > QUEUE_SIZE) {
+ logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
+ slipMessageQueue.poll();
}
i = 0;
+ } else {
+ logger.warn("Poller.call(): non slip messsage discarded => PLEASE REPORT !!");
buf[0] = SLIP_MARK;
- continue;
+ i = 1;
}
+ continue;
}
if (++i >= BUFFER_SIZE) {
+ logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
i = 0;
}
} catch (SocketTimeoutException e) {
// any other exception => stop polling
String msg = e.getMessage();
pollException = msg != null ? msg : "Generic IOException";
- logger.debug("pollRunner() stopping '{}'", pollException);
+ logger.debug("Poller.call(): stopping '{}'", pollException);
break;
}
}
+ logger.trace("Poller.call(): ended");
// we only get here if shutdown or an error occurs so free ourself so we can be recreated again
pollRunner = null;
return true;
* Start the polling task
*/
private void startPolling() {
- Poller pollRunner = this.pollRunner;
if (pollRunner == null) {
logger.trace("startPolling()");
- pollRunner = this.pollRunner = new Poller();
- executor.submit(pollRunner);
+ pollRunner = new Poller(executor);
}
}
if (pollRunner != null) {
logger.trace("stopPolling()");
pollRunner.interrupt();
- this.pollRunner = null;
}
executor.shutdown();
}