2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.caddx.internal;
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Deque;
20 import java.util.HashSet;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.LinkedBlockingDeque;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.stream.IntStream;
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.core.io.transport.serial.PortInUseException;
31 import org.openhab.core.io.transport.serial.SerialPort;
32 import org.openhab.core.io.transport.serial.SerialPortEvent;
33 import org.openhab.core.io.transport.serial.SerialPortEventListener;
34 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
35 import org.openhab.core.io.transport.serial.SerialPortManager;
36 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
37 import org.openhab.core.util.HexUtils;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * The {@link CaddxCommunicator} is responsible for the asynchronous serial communication
44 * @author Georgios Moutsos - Initial contribution
47 public class CaddxCommunicator implements SerialPortEventListener {
48 private final Logger logger = LoggerFactory.getLogger(CaddxCommunicator.class);
50 private final SerialPortManager portManager;
51 private final Set<CaddxPanelListener> listenerQueue = new HashSet<>();
52 private final Deque<CaddxMessage> messages = new LinkedBlockingDeque<>();
53 private final SynchronousQueue<CaddxMessage> exchanger = new SynchronousQueue<>();
54 private final Thread communicator;
55 private final CaddxProtocol protocol;
56 private final String serialPortName;
57 private final int baudRate;
58 private final SerialPort serialPort;
59 private final InputStream in;
60 private final OutputStream out;
62 // Receiver state variables
63 private boolean inMessage = false;
64 private boolean haveFirstByte = false;
65 private int messageBufferLength = 0;
66 private byte[] message;
67 private int messageBufferIndex = 0;
68 private boolean unStuff = false;
69 private int tempAsciiByte = 0;
71 public CaddxCommunicator(SerialPortManager portManager, CaddxProtocol protocol, String serialPortName, int baudRate)
72 throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
73 this.portManager = portManager;
74 this.protocol = protocol;
75 this.serialPortName = serialPortName;
76 this.baudRate = baudRate;
78 SerialPortIdentifier portIdentifier = this.portManager.getIdentifier(serialPortName);
79 if (portIdentifier == null) {
80 throw new IOException("Cannot get the port identifier.");
82 serialPort = portIdentifier.open(this.getClass().getName(), 2000);
83 serialPort.setSerialPortParams(baudRate, SerialPort.DATABITS_8, SerialPort.STOPBITS_1, SerialPort.PARITY_NONE);
84 serialPort.enableReceiveThreshold(1);
85 serialPort.disableReceiveTimeout();
87 InputStream localIn = serialPort.getInputStream();
88 if (localIn == null) {
89 logger.warn("Cannot get the input stream of the serial port");
90 throw new IOException("Input stream is null");
94 OutputStream localOut = serialPort.getOutputStream();
95 if (localOut == null) {
96 logger.warn("Cannot get the output stream of the serial port");
97 throw new IOException("Output stream is null");
101 serialPort.notifyOnDataAvailable(true);
102 serialPort.addEventListener(this);
104 communicator = new Thread(this::messageDispatchLoop, "Caddx Communicator");
105 communicator.setDaemon(true);
106 communicator.start();
108 message = new byte[0];
110 logger.trace("CaddxCommunicator communication thread started successfully for {}", serialPortName);
113 public CaddxProtocol getProtocol() {
117 public String getSerialPortName() {
118 return serialPortName;
121 public int getBaudRate() {
125 public void addListener(CaddxPanelListener listener) {
126 listenerQueue.add(listener);
130 * Send message to panel. Asynchronous, i.e. returns immediately.
131 * Messages are sent only when panel is ready (i.e. sent an
132 * acknowledgment to last message), but no checks are implemented that
133 * the message was correctly received and executed.
135 * @param msg Data to be sent to panel. First byte is message type.
136 * Fletcher sum is computed and appended by transmit.
138 public void transmit(CaddxMessage msg) {
143 * Adds this message before any others in the queue.
144 * Used by receiver to send ACKs.
146 * @param msg The message
148 public void transmitFirst(CaddxMessage msg) {
149 messages.addFirst(msg);
153 logger.trace("CaddxCommunicator stopping");
155 // kick thread out of waiting for FIFO
156 communicator.interrupt();
158 // Close the streams first to unblock blocked reads and writes
161 } catch (IOException e) {
165 } catch (IOException e) {
168 // Wait until communication thread exits
170 communicator.join(3000);
171 } catch (InterruptedException e) {
174 // Also close the serial port
175 serialPort.removeEventListener();
179 @SuppressWarnings("null")
180 private void messageDispatchLoop() {
181 int @Nullable [] expectedMessageNumbers = null;
184 CaddxMessage outgoingMessage = null;
185 boolean skipTransmit = true;
188 // loop until the thread is interrupted, sending out messages
189 while (!Thread.currentThread().isInterrupted()) {
190 // Initialize the state
191 outgoingMessage = null;
192 expectedMessageNumbers = null;
195 // send next outgoing message if we have one
196 outgoingMessage = messages.poll();
197 if (outgoingMessage != null) {
198 logger.trace("CaddxCommunicator.run() Outgoing message: {}", outgoingMessage.getMessageType());
200 byte[] msg = outgoingMessage.getMessageFrameBytes(protocol);
204 expectedMessageNumbers = outgoingMessage.getReplyMessageNumbers();
207 if (logger.isDebugEnabled()) {
208 logger.debug("->: {}", outgoingMessage.getName());
209 logger.debug("->: {}", HexUtils
210 .bytesToHex(outgoingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
214 logger.trace("CaddxCommunicator.run() skipTransmit: true");
215 skipTransmit = false;
218 // Check for an incoming message
219 CaddxMessage incomingMessage = null;
221 incomingMessage = exchanger.poll(3, TimeUnit.SECONDS);
222 } catch (InterruptedException e) {
223 logger.debug("CaddxCommunicator.run() InterruptedException caught.");
224 Thread.currentThread().interrupt();
228 if (incomingMessage == null) {
229 if (expectedMessageNumbers == null) { // Nothing expected, Nothing received we continue
230 logger.trace("CaddxCommunicator.run(): Nothing expected, Nothing received we continue");
234 if (logger.isDebugEnabled()) {
235 logger.debug("<-: {}", incomingMessage.getName());
236 logger.debug("<-: {}",
237 HexUtils.bytesToHex(incomingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
241 // Check if we wait for a reply
242 if (expectedMessageNumbers == null) {
243 if (incomingMessage != null) { // Nothing expected. Message received.
244 logger.trace("CaddxCommunicator.run() Nothing expected, Message received");
246 // Check if Acknowledgement handling is required.
247 if (incomingMessage.hasAcknowledgementFlag()) {
248 if (incomingMessage.isChecksumCorrect()) {
250 transmitFirst(new CaddxMessage(CaddxMessageType.POSITIVE_ACKNOWLEDGE, ""));
253 transmitFirst(new CaddxMessage(CaddxMessageType.NEGATIVE_ACKNOWLEDGE, ""));
258 if (incomingMessage == null) {
259 logger.trace("CaddxCommunicator.run() Message expected. Nothing received");
261 // Message expected. Nothing received
262 if (outgoingMessage != null) {
263 transmitFirst(outgoingMessage); // put message in queue again
267 logger.trace("CaddxCommunicator.run() Message expected. Message received");
269 // Message expected. Message received.
270 int receivedMessageType = incomingMessage.getMessageType();
271 boolean isMessageExpected = IntStream.of(expectedMessageNumbers)
272 .anyMatch(x -> x == receivedMessageType);
274 if (!isMessageExpected) {
275 logger.trace("Non expected message received exp:{}, recv: {}", expectedMessageNumbers,
276 receivedMessageType);
278 // Non expected reply received
279 if (outgoingMessage != null) {
280 transmitFirst(outgoingMessage); // put message in queue again
281 skipTransmit = true; // Skip the transmit on the next cycle to receive the panel message
287 // Inform the listeners
288 if (incomingMessage != null) {
289 if (incomingMessage.isChecksumCorrect()) {
290 for (CaddxPanelListener listener : listenerQueue) {
291 listener.caddxMessage(this, incomingMessage);
295 "CaddxCommunicator.run() Received packet checksum does not match. in: {} {}, calc {} {}",
296 incomingMessage.getChecksum1In(), incomingMessage.getChecksum2In(),
297 incomingMessage.getChecksum1Calc(), incomingMessage.getChecksum2Calc());
301 } catch (IOException e) {
302 logger.debug("CaddxCommunicator.run() IOException. Stopping sender thread. {}", getSerialPortName());
303 Thread.currentThread().interrupt();
306 logger.warn("CaddxCommunicator.run() Sender thread stopped. {}", getSerialPortName());
310 * Event handler to receive the data from the serial port
312 * @param SerialPortEvent serialPortEvent The event that occurred on the serial port
315 public void serialEvent(@Nullable SerialPortEvent serialPortEvent) {
316 if (serialPortEvent == null) {
320 if (serialPortEvent.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
321 logger.trace("Data receiving from the serial port");
322 if (protocol == CaddxProtocol.Binary) {
323 receiveInBinaryProtocol(serialPortEvent);
325 receiveInAsciiProtocol(serialPortEvent);
330 private int readByte(InputStream stream) throws IOException {
333 if (stream.available() > 0) {
337 throw new EOFException();
342 private int readAsciiByte(InputStream stream) throws IOException {
343 if (!haveFirstByte) { // this is the 1st digit
344 int b = readByte(in);
345 tempAsciiByte = (b >= 0x30 && b <= 0x39) ? (b - 0x30) * 0x10 : (b - 0x37) * 0x10;
346 haveFirstByte = true;
349 if (haveFirstByte) { // this is the 2nd digit
350 int b = readByte(in);
351 tempAsciiByte += (b >= 0x30 && b <= 0x39) ? (b - 0x30) : (b - 0x37);
352 haveFirstByte = false;
355 return tempAsciiByte;
358 private void loopUntilByteIsRead(InputStream stream, int byteToRead) throws IOException {
362 } while (b != byteToRead);
365 private void offerCaddxMessage() throws InterruptedException {
366 logger.trace("Offering received message");
368 // Full message received in data byte array
369 CaddxMessage caddxMessage = new CaddxMessage(message, true);
370 if (!exchanger.offer(caddxMessage, 3, TimeUnit.SECONDS)) {
371 logger.debug("Offered message was not received");
375 private void receiveInBinaryProtocol(SerialPortEvent serialPortEvent) {
377 // Read the start byte
378 if (!inMessage) // skip until 0x7E
380 loopUntilByteIsRead(in, 0x7e);
383 messageBufferLength = 0;
385 logger.trace("CaddxCommunicator.handleBinaryProtocol() Got start byte");
387 // Read the message length
388 if (messageBufferLength == 0) {
389 int b = readByte(in);
390 messageBufferLength = b + 2; // add two bytes for the checksum
391 message = new byte[messageBufferLength];
392 messageBufferIndex = 0;
394 logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message length {}", messageBufferLength);
398 int b = readByte(in);
399 message[messageBufferIndex] = (byte) b;
401 if (message[messageBufferIndex] == 0x7D) {
407 message[messageBufferIndex] |= 0x20;
411 messageBufferIndex++;
412 } while (messageBufferIndex < messageBufferLength);
417 logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message {}", message[0]);
418 } catch (EOFException e) {
420 } catch (IOException e) {
421 } catch (InterruptedException e) {
422 logger.trace("InterruptedException caught.");
423 Thread.currentThread().interrupt();
426 // Initialize state for a new reception
428 messageBufferLength = 0;
429 messageBufferIndex = 0;
433 private void receiveInAsciiProtocol(SerialPortEvent serialPortEvent) {
435 // Read the start byte
437 loopUntilByteIsRead(in, 0x0a);
440 haveFirstByte = false;
441 messageBufferLength = 0;
443 logger.trace("CaddxCommunicator.handleAsciiProtocol() Got start byte");
445 // Read the message length
446 if (messageBufferLength == 0) {
447 int b = readAsciiByte(in);
448 messageBufferLength = b + 2; // add 2 bytes for the checksum
449 message = new byte[messageBufferLength];
451 logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message length {}", messageBufferLength);
455 int b = readAsciiByte(in);
456 message[messageBufferIndex] = (byte) b;
457 messageBufferIndex++;
458 } while (messageBufferIndex < messageBufferLength);
463 logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message {}", message[0]);
464 } catch (EOFException e) {
466 } catch (IOException e) {
467 } catch (InterruptedException e) {
468 logger.trace("InterruptedException caught.");
469 Thread.currentThread().interrupt();
472 // Initialize state for a new reception
474 messageBufferLength = 0;
475 messageBufferIndex = 0;