2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.caddx.internal;
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Deque;
20 import java.util.HashSet;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.LinkedBlockingDeque;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.stream.IntStream;
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.core.io.transport.serial.PortInUseException;
31 import org.openhab.core.io.transport.serial.SerialPort;
32 import org.openhab.core.io.transport.serial.SerialPortEvent;
33 import org.openhab.core.io.transport.serial.SerialPortEventListener;
34 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
35 import org.openhab.core.io.transport.serial.SerialPortManager;
36 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
37 import org.openhab.core.util.HexUtils;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * The {@link CaddxCommunicator} is responsible for the asynchronous serial communication
44 * @author Georgios Moutsos - Initial contribution
47 public class CaddxCommunicator implements SerialPortEventListener {
48 private final Logger logger = LoggerFactory.getLogger(CaddxCommunicator.class);
50 private final SerialPortManager portManager;
51 private final Set<CaddxPanelListener> listenerQueue = new HashSet<>();
52 private final Deque<CaddxMessage> messages = new LinkedBlockingDeque<>();
53 private final SynchronousQueue<CaddxMessage> exchanger = new SynchronousQueue<>();
54 private final Thread communicator;
55 private final CaddxProtocol protocol;
56 private final String serialPortName;
57 private final int baudRate;
58 private final SerialPort serialPort;
59 private final InputStream in;
60 private final OutputStream out;
62 // Receiver state variables
63 private boolean inMessage = false;
64 private boolean haveFirstByte = false;
65 private int messageBufferLength = 0;
66 private byte[] message;
67 private int messageBufferIndex = 0;
68 private boolean unStuff = false;
69 private int tempAsciiByte = 0;
71 public CaddxCommunicator(String uid, SerialPortManager portManager, CaddxProtocol protocol, String serialPortName,
73 throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
74 this.portManager = portManager;
75 this.protocol = protocol;
76 this.serialPortName = serialPortName;
77 this.baudRate = baudRate;
79 SerialPortIdentifier portIdentifier = this.portManager.getIdentifier(serialPortName);
80 if (portIdentifier == null) {
81 throw new IOException("Cannot get the port identifier.");
83 serialPort = portIdentifier.open(this.getClass().getName(), 2000);
84 serialPort.setSerialPortParams(baudRate, SerialPort.DATABITS_8, SerialPort.STOPBITS_1, SerialPort.PARITY_NONE);
85 serialPort.enableReceiveThreshold(1);
86 serialPort.disableReceiveTimeout();
88 InputStream localIn = serialPort.getInputStream();
89 if (localIn == null) {
90 logger.warn("Cannot get the input stream of the serial port");
91 throw new IOException("Input stream is null");
95 OutputStream localOut = serialPort.getOutputStream();
96 if (localOut == null) {
97 logger.warn("Cannot get the output stream of the serial port");
98 throw new IOException("Output stream is null");
102 serialPort.notifyOnDataAvailable(true);
103 serialPort.addEventListener(this);
105 communicator = new Thread(this::messageDispatchLoop, "OH-binding-" + uid + "-caddxCommunicator");
106 communicator.setDaemon(true);
107 communicator.start();
109 message = new byte[0];
111 logger.trace("CaddxCommunicator communication thread started successfully for {}", serialPortName);
114 public CaddxProtocol getProtocol() {
118 public String getSerialPortName() {
119 return serialPortName;
122 public int getBaudRate() {
126 public void addListener(CaddxPanelListener listener) {
127 listenerQueue.add(listener);
131 * Send message to panel. Asynchronous, i.e. returns immediately.
132 * Messages are sent only when panel is ready (i.e. sent an
133 * acknowledgment to last message), but no checks are implemented that
134 * the message was correctly received and executed.
136 * @param msg Data to be sent to panel. First byte is message type.
137 * Fletcher sum is computed and appended by transmit.
139 public void transmit(CaddxMessage msg) {
144 * Adds this message before any others in the queue.
145 * Used by receiver to send ACKs.
147 * @param msg The message
149 public void transmitFirst(CaddxMessage msg) {
150 messages.addFirst(msg);
154 logger.trace("CaddxCommunicator stopping");
156 // kick thread out of waiting for FIFO
157 communicator.interrupt();
159 // Close the streams first to unblock blocked reads and writes
162 } catch (IOException e) {
166 } catch (IOException e) {
169 // Wait until communication thread exits
171 communicator.join(3000);
172 } catch (InterruptedException e) {
175 // Also close the serial port
176 serialPort.removeEventListener();
180 @SuppressWarnings("null")
181 private void messageDispatchLoop() {
182 int @Nullable [] expectedMessageNumbers = null;
185 CaddxMessage outgoingMessage = null;
186 boolean skipTransmit = true;
189 // loop until the thread is interrupted, sending out messages
190 while (!Thread.currentThread().isInterrupted()) {
191 // Initialize the state
192 outgoingMessage = null;
193 expectedMessageNumbers = null;
196 // send next outgoing message if we have one
197 outgoingMessage = messages.poll();
198 if (outgoingMessage != null) {
199 logger.trace("CaddxCommunicator.run() Outgoing message: {}", outgoingMessage.getMessageType());
201 byte[] msg = outgoingMessage.getMessageFrameBytes(protocol);
205 expectedMessageNumbers = outgoingMessage.getReplyMessageNumbers();
208 if (logger.isDebugEnabled()) {
209 logger.debug("->: {}", outgoingMessage.getName());
210 logger.debug("->: {}", HexUtils
211 .bytesToHex(outgoingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
215 logger.trace("CaddxCommunicator.run() skipTransmit: true");
216 skipTransmit = false;
219 // Check for an incoming message
220 CaddxMessage incomingMessage = null;
222 incomingMessage = exchanger.poll(3, TimeUnit.SECONDS);
223 } catch (InterruptedException e) {
224 logger.debug("CaddxCommunicator.run() InterruptedException caught.");
225 Thread.currentThread().interrupt();
229 if (incomingMessage == null) {
230 if (expectedMessageNumbers == null) { // Nothing expected, Nothing received we continue
231 logger.trace("CaddxCommunicator.run(): Nothing expected, Nothing received we continue");
235 if (logger.isDebugEnabled()) {
236 logger.debug("<-: {}", incomingMessage.getName());
237 logger.debug("<-: {}",
238 HexUtils.bytesToHex(incomingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
242 // Check if we wait for a reply
243 if (expectedMessageNumbers == null) {
244 if (incomingMessage != null) { // Nothing expected. Message received.
245 logger.trace("CaddxCommunicator.run() Nothing expected, Message received");
247 // Check if Acknowledgement handling is required.
248 if (incomingMessage.hasAcknowledgementFlag()) {
249 if (incomingMessage.isChecksumCorrect()) {
251 transmitFirst(new CaddxMessage(CaddxMessageType.POSITIVE_ACKNOWLEDGE, ""));
254 transmitFirst(new CaddxMessage(CaddxMessageType.NEGATIVE_ACKNOWLEDGE, ""));
259 if (incomingMessage == null) {
260 logger.trace("CaddxCommunicator.run() Message expected. Nothing received");
262 // Message expected. Nothing received
263 if (outgoingMessage != null) {
264 transmitFirst(outgoingMessage); // put message in queue again
268 logger.trace("CaddxCommunicator.run() Message expected. Message received");
270 // Message expected. Message received.
271 int receivedMessageType = incomingMessage.getMessageType();
272 boolean isMessageExpected = IntStream.of(expectedMessageNumbers)
273 .anyMatch(x -> x == receivedMessageType);
275 if (!isMessageExpected) {
276 logger.trace("Non expected message received exp:{}, recv: {}", expectedMessageNumbers,
277 receivedMessageType);
279 // Non expected reply received
280 if (outgoingMessage != null) {
281 transmitFirst(outgoingMessage); // put message in queue again
282 skipTransmit = true; // Skip the transmit on the next cycle to receive the panel message
288 // Inform the listeners
289 if (incomingMessage != null) {
290 if (incomingMessage.isChecksumCorrect()) {
291 for (CaddxPanelListener listener : listenerQueue) {
292 listener.caddxMessage(this, incomingMessage);
296 "CaddxCommunicator.run() Received packet checksum does not match. in: {} {}, calc {} {}",
297 incomingMessage.getChecksum1In(), incomingMessage.getChecksum2In(),
298 incomingMessage.getChecksum1Calc(), incomingMessage.getChecksum2Calc());
302 } catch (IOException e) {
303 logger.debug("CaddxCommunicator.run() IOException. Stopping sender thread. {}", getSerialPortName());
304 Thread.currentThread().interrupt();
307 logger.warn("CaddxCommunicator.run() Sender thread stopped. {}", getSerialPortName());
311 * Event handler to receive the data from the serial port
313 * @param SerialPortEvent serialPortEvent The event that occurred on the serial port
316 public void serialEvent(@Nullable SerialPortEvent serialPortEvent) {
317 if (serialPortEvent == null) {
321 if (serialPortEvent.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
322 logger.trace("Data receiving from the serial port");
323 if (protocol == CaddxProtocol.Binary) {
324 receiveInBinaryProtocol(serialPortEvent);
326 receiveInAsciiProtocol(serialPortEvent);
331 private int readByte(InputStream stream) throws IOException {
334 if (stream.available() > 0) {
338 throw new EOFException();
343 private int readAsciiByte(InputStream stream) throws IOException {
344 if (!haveFirstByte) { // this is the 1st digit
345 int b = readByte(in);
346 tempAsciiByte = (b >= 0x30 && b <= 0x39) ? (b - 0x30) * 0x10 : (b - 0x37) * 0x10;
347 haveFirstByte = true;
350 if (haveFirstByte) { // this is the 2nd digit
351 int b = readByte(in);
352 tempAsciiByte += (b >= 0x30 && b <= 0x39) ? (b - 0x30) : (b - 0x37);
353 haveFirstByte = false;
356 return tempAsciiByte;
359 private void loopUntilByteIsRead(InputStream stream, int byteToRead) throws IOException {
363 } while (b != byteToRead);
366 private void offerCaddxMessage() throws InterruptedException {
367 logger.trace("Offering received message");
369 // Full message received in data byte array
370 CaddxMessage caddxMessage = new CaddxMessage(message, true);
371 if (!exchanger.offer(caddxMessage, 3, TimeUnit.SECONDS)) {
372 logger.debug("Offered message was not received");
376 private void receiveInBinaryProtocol(SerialPortEvent serialPortEvent) {
378 // Read the start byte
379 if (!inMessage) // skip until 0x7E
381 loopUntilByteIsRead(in, 0x7e);
384 messageBufferLength = 0;
386 logger.trace("CaddxCommunicator.handleBinaryProtocol() Got start byte");
388 // Read the message length
389 if (messageBufferLength == 0) {
390 int b = readByte(in);
391 messageBufferLength = b + 2; // add two bytes for the checksum
392 message = new byte[messageBufferLength];
393 messageBufferIndex = 0;
395 logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message length {}", messageBufferLength);
399 int b = readByte(in);
400 message[messageBufferIndex] = (byte) b;
402 if (message[messageBufferIndex] == 0x7D) {
408 message[messageBufferIndex] |= 0x20;
412 messageBufferIndex++;
413 } while (messageBufferIndex < messageBufferLength);
418 logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message {}", message[0]);
419 } catch (EOFException e) {
421 } catch (IOException e) {
422 } catch (InterruptedException e) {
423 logger.trace("InterruptedException caught.");
424 Thread.currentThread().interrupt();
427 // Initialize state for a new reception
429 messageBufferLength = 0;
430 messageBufferIndex = 0;
434 private void receiveInAsciiProtocol(SerialPortEvent serialPortEvent) {
436 // Read the start byte
438 loopUntilByteIsRead(in, 0x0a);
441 haveFirstByte = false;
442 messageBufferLength = 0;
444 logger.trace("CaddxCommunicator.handleAsciiProtocol() Got start byte");
446 // Read the message length
447 if (messageBufferLength == 0) {
448 int b = readAsciiByte(in);
449 messageBufferLength = b + 2; // add 2 bytes for the checksum
450 message = new byte[messageBufferLength];
452 logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message length {}", messageBufferLength);
456 int b = readAsciiByte(in);
457 message[messageBufferIndex] = (byte) b;
458 messageBufferIndex++;
459 } while (messageBufferIndex < messageBufferLength);
464 logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message {}", message[0]);
465 } catch (EOFException e) {
467 } catch (IOException e) {
468 } catch (InterruptedException e) {
469 logger.trace("InterruptedException caught.");
470 Thread.currentThread().interrupt();
473 // Initialize state for a new reception
475 messageBufferLength = 0;
476 messageBufferIndex = 0;