2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.velux.internal.bridge.slip.io;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.net.SocketTimeoutException;
18 import java.util.Arrays;
19 import java.util.Queue;
20 import java.util.concurrent.Callable;
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * This class implements a Callable to read SLIP messages from the input stream.
30 * It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer.
31 * And it parses the bytes into SLIP messages, which are placed on a message queue.
33 * @author Andrew Fiddian-Green - Initial Contribution; refactored from private class in DataInputStreamWithTimeout
36 public class Poller implements Callable<Boolean> {
38 private static final int BUFFER_SIZE = 512;
39 private static final int QUEUE_SIZE = 512;
41 // special character that marks the first and last byte of a slip message
42 private static final byte SLIP_MARK = (byte) 0xc0;
43 private static final byte SLIP_PROT = 0;
45 private final Logger logger = LoggerFactory.getLogger(Poller.class);
47 private final InputStream inputStream;
48 private final Queue<byte[]> messageQueue;
50 private @Nullable volatile Thread thread;
52 public Poller(InputStream stream, Queue<byte[]> queue) {
53 logger.trace("Poller: created");
58 public void interrupt() {
59 Thread thread = this.thread;
60 if ((thread != null) && thread.isAlive()) {
66 * Task that loops to read bytes from inputStream and build SLIP packets from them. The SLIP packets are placed in
67 * messageQueue. It runs until 'interrupt()' or 'Thread.interrupt()' are called.
69 * @throws IOException in case of socket read errors
72 public Boolean call() throws IOException {
73 thread = Thread.currentThread();
74 logger.trace("Poller.call(): started");
75 byte[] buf = new byte[BUFFER_SIZE];
79 while (!Thread.currentThread().isInterrupted()) {
81 byt = inputStream.read(); // throws IOException
82 // end of stream is OK => continue polling
86 } catch (SocketTimeoutException e) {
87 // socket read time out is OK => continue polling
91 if ((i > 0) && (buf[i] == SLIP_MARK)) {
92 // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
93 if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
94 messageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
95 if (messageQueue.size() > QUEUE_SIZE) {
96 logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
101 if (logger.isWarnEnabled()) {
102 StringBuilder sb = new StringBuilder();
103 for (int j = 0; j <= i; j++) {
104 sb.append(String.format("%02X ", buf[j]));
106 logger.warn("Poller.call(): non slip messsage {} discarded => PLEASE REPORT !!", sb.toString());
113 if (++i >= BUFFER_SIZE) {
114 logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
118 logger.trace("Poller.call(): completed");