2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.caddx.internal;
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Deque;
20 import java.util.HashSet;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.LinkedBlockingDeque;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.stream.IntStream;
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.core.io.transport.serial.PortInUseException;
31 import org.openhab.core.io.transport.serial.SerialPort;
32 import org.openhab.core.io.transport.serial.SerialPortEvent;
33 import org.openhab.core.io.transport.serial.SerialPortEventListener;
34 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
35 import org.openhab.core.io.transport.serial.SerialPortManager;
36 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
37 import org.openhab.core.util.HexUtils;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * The {@link CaddxCommunicator} is responsible for the asynchronous serial communication
44 * @author Georgios Moutsos - Initial contribution
47 public class CaddxCommunicator implements SerialPortEventListener {
48 private final Logger logger = LoggerFactory.getLogger(CaddxCommunicator.class);
50 private final SerialPortManager portManager;
51 private final Set<CaddxPanelListener> listenerQueue = new HashSet<>();
52 private final Deque<CaddxMessage> messages = new LinkedBlockingDeque<>();
53 private final SynchronousQueue<CaddxMessage> exchanger = new SynchronousQueue<>();
54 private final Thread communicator;
55 private final CaddxProtocol protocol;
56 private final String serialPortName;
57 private final int baudRate;
58 private final SerialPort serialPort;
59 private final InputStream in;
60 private final OutputStream out;
62 // Receiver state variables
63 private boolean inMessage = false;
64 private boolean haveFirstByte = false;
65 private int messageBufferLength = 0;
66 private byte[] message;
67 private int messageBufferIndex = 0;
68 private boolean unStuff = false;
69 private int tempAsciiByte = 0;
71 public CaddxCommunicator(String uid, SerialPortManager portManager, CaddxProtocol protocol, String serialPortName,
73 throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
74 this.portManager = portManager;
75 this.protocol = protocol;
76 this.serialPortName = serialPortName;
77 this.baudRate = baudRate;
79 SerialPortIdentifier portIdentifier = this.portManager.getIdentifier(serialPortName);
80 if (portIdentifier == null) {
81 throw new IOException("Cannot get the port identifier.");
83 serialPort = portIdentifier.open(this.getClass().getName(), 2000);
84 serialPort.setSerialPortParams(baudRate, SerialPort.DATABITS_8, SerialPort.STOPBITS_1, SerialPort.PARITY_NONE);
85 serialPort.enableReceiveThreshold(1);
86 serialPort.disableReceiveTimeout();
88 InputStream localIn = serialPort.getInputStream();
89 if (localIn == null) {
90 logger.warn("Cannot get the input stream of the serial port");
91 throw new IOException("Input stream is null");
95 OutputStream localOut = serialPort.getOutputStream();
96 if (localOut == null) {
97 logger.warn("Cannot get the output stream of the serial port");
98 throw new IOException("Output stream is null");
102 serialPort.notifyOnDataAvailable(true);
103 serialPort.addEventListener(this);
105 communicator = new Thread(this::messageDispatchLoop, "OH-binding-" + uid + "-caddxCommunicator");
106 communicator.setDaemon(true);
107 communicator.start();
109 message = new byte[0];
111 logger.trace("CaddxCommunicator communication thread started successfully for {}", serialPortName);
114 public CaddxProtocol getProtocol() {
118 public String getSerialPortName() {
119 return serialPortName;
122 public int getBaudRate() {
126 public void addListener(CaddxPanelListener listener) {
127 listenerQueue.add(listener);
131 * Send message to panel. Asynchronous, i.e. returns immediately.
132 * Messages are sent only when panel is ready (i.e. sent an
133 * acknowledgment to last message), but no checks are implemented that
134 * the message was correctly received and executed.
136 * @param msg Data to be sent to panel. First byte is message type.
137 * Fletcher sum is computed and appended by transmit.
139 public void transmit(CaddxMessage msg) {
144 * Adds this message before any others in the queue.
145 * Used by receiver to send ACKs.
147 * @param msg The message
149 public void transmitFirst(CaddxMessage msg) {
150 messages.addFirst(msg);
154 logger.trace("CaddxCommunicator stopping");
156 // kick thread out of waiting for FIFO
157 communicator.interrupt();
159 // Close the streams first to unblock blocked reads and writes
162 } catch (IOException e) {
166 } catch (IOException e) {
169 // Wait until communication thread exits
171 communicator.join(3000);
172 } catch (InterruptedException e) {
175 // Also close the serial port
176 serialPort.removeEventListener();
180 @SuppressWarnings("null")
181 private void messageDispatchLoop() {
182 int[] expectedMessageNumbers = null;
183 CaddxMessage outgoingMessage = null;
184 boolean skipTransmit = true;
185 CaddxMessageContext context = null;
188 // loop until the thread is interrupted, sending out messages
189 while (!Thread.currentThread().isInterrupted()) {
190 // Initialize the state
191 outgoingMessage = null;
193 expectedMessageNumbers = null;
196 // send next outgoing message if we have one
197 outgoingMessage = messages.poll();
198 if (outgoingMessage != null) {
199 logger.trace("CaddxCommunicator.run() Outgoing message: {}", outgoingMessage.getMessageType());
201 byte[] msg = outgoingMessage.getMessageFrameBytes(protocol);
205 expectedMessageNumbers = outgoingMessage.getReplyMessageNumbers();
206 context = outgoingMessage.getContext();
209 if (logger.isDebugEnabled()) {
210 logger.debug("->: {}", outgoingMessage.getName());
211 logger.debug("->: {}", HexUtils
212 .bytesToHex(outgoingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
216 logger.trace("CaddxCommunicator.run() skipTransmit: true");
217 skipTransmit = false;
220 // Check for an incoming message
221 CaddxMessage incomingMessage = null;
223 incomingMessage = exchanger.poll(3, TimeUnit.SECONDS);
224 } catch (InterruptedException e) {
225 logger.debug("CaddxCommunicator.run() InterruptedException caught.");
226 Thread.currentThread().interrupt();
230 if (incomingMessage == null) {
231 if (expectedMessageNumbers == null) { // Nothing expected, Nothing received we continue
232 logger.trace("CaddxCommunicator.run(): Nothing expected, Nothing received we continue");
236 if (logger.isDebugEnabled()) {
237 logger.debug("<-: {}", incomingMessage.getName());
238 logger.debug("<-: {}",
239 HexUtils.bytesToHex(incomingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
243 // Check if we wait for a reply
244 if (expectedMessageNumbers == null) {
245 if (incomingMessage != null) { // Nothing expected. Message received.
246 logger.trace("CaddxCommunicator.run() Nothing expected, Message received");
248 // Check if Acknowledgement handling is required.
249 if (incomingMessage.hasAcknowledgementFlag()) {
250 if (incomingMessage.isChecksumCorrect()) {
252 transmitFirst(new CaddxMessage(CaddxMessageContext.NONE,
253 CaddxMessageType.POSITIVE_ACKNOWLEDGE, ""));
256 transmitFirst(new CaddxMessage(CaddxMessageContext.NONE,
257 CaddxMessageType.NEGATIVE_ACKNOWLEDGE, ""));
262 if (incomingMessage == null) {
263 logger.trace("CaddxCommunicator.run() Message expected. Nothing received");
265 // Message expected. Nothing received
266 if (outgoingMessage != null) {
267 transmitFirst(outgoingMessage); // put message in queue again
271 logger.trace("CaddxCommunicator.run() Message expected. Message received");
273 // Message expected. Message received.
274 int receivedMessageType = incomingMessage.getMessageType();
275 boolean isMessageExpected = IntStream.of(expectedMessageNumbers)
276 .anyMatch(x -> x == receivedMessageType);
278 if (!isMessageExpected) {
279 logger.trace("Non expected message received exp:{}, recv: {}", expectedMessageNumbers,
280 receivedMessageType);
282 // Non expected reply received
283 if (outgoingMessage != null) {
284 transmitFirst(outgoingMessage); // put message in queue again
285 skipTransmit = true; // Skip the transmit on the next cycle to receive the panel message
291 // Inform the listeners
292 if (incomingMessage != null) {
293 if (incomingMessage.isChecksumCorrect()) {
294 for (CaddxPanelListener listener : listenerQueue) {
295 if (context != null) {
296 incomingMessage.setContext(context);
298 listener.caddxMessage(incomingMessage);
302 "CaddxCommunicator.run() Received packet checksum does not match. in: {} {}, calc {} {}",
303 incomingMessage.getChecksum1In(), incomingMessage.getChecksum2In(),
304 incomingMessage.getChecksum1Calc(), incomingMessage.getChecksum2Calc());
308 } catch (IOException e) {
309 logger.debug("CaddxCommunicator.run() IOException. Stopping sender thread. {}", getSerialPortName());
310 Thread.currentThread().interrupt();
313 logger.warn("CaddxCommunicator.run() Sender thread stopped. {}", getSerialPortName());
317 * Event handler to receive the data from the serial port
319 * @param serialPortEvent The event that occurred on the serial port
322 public void serialEvent(@Nullable SerialPortEvent serialPortEvent) {
323 if (serialPortEvent == null) {
327 if (serialPortEvent.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
328 logger.trace("Data receiving from the serial port");
329 if (protocol == CaddxProtocol.Binary) {
330 receiveInBinaryProtocol(serialPortEvent);
332 receiveInAsciiProtocol(serialPortEvent);
337 private int readByte(InputStream stream) throws IOException {
340 if (stream.available() > 0) {
344 throw new EOFException();
349 private int readAsciiByte(InputStream stream) throws IOException {
350 if (!haveFirstByte) { // this is the 1st digit
351 int b = readByte(in);
352 tempAsciiByte = (b >= 0x30 && b <= 0x39) ? (b - 0x30) * 0x10 : (b - 0x37) * 0x10;
353 haveFirstByte = true;
356 if (haveFirstByte) { // this is the 2nd digit
357 int b = readByte(in);
358 tempAsciiByte += (b >= 0x30 && b <= 0x39) ? (b - 0x30) : (b - 0x37);
359 haveFirstByte = false;
362 return tempAsciiByte;
365 private void loopUntilByteIsRead(InputStream stream, int byteToRead) throws IOException {
369 } while (b != byteToRead);
372 private void offerCaddxMessage() throws InterruptedException {
373 logger.trace("Offering received message");
375 // Full message received in data byte array
376 CaddxMessage caddxMessage = new CaddxMessage(CaddxMessageContext.NONE, message, true);
377 if (!exchanger.offer(caddxMessage, 3, TimeUnit.SECONDS)) {
378 logger.debug("Offered message was not received");
382 private void receiveInBinaryProtocol(SerialPortEvent serialPortEvent) {
384 // Read the start byte
385 if (!inMessage) // skip until 0x7E
387 loopUntilByteIsRead(in, 0x7e);
390 messageBufferLength = 0;
392 logger.trace("CaddxCommunicator.handleBinaryProtocol() Got start byte");
394 // Read the message length
395 if (messageBufferLength == 0) {
396 int b = readByte(in);
397 messageBufferLength = b + 2; // add two bytes for the checksum
398 message = new byte[messageBufferLength];
399 messageBufferIndex = 0;
401 logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message length {}", messageBufferLength);
405 int b = readByte(in);
406 message[messageBufferIndex] = (byte) b;
408 if (message[messageBufferIndex] == 0x7D) {
414 message[messageBufferIndex] |= 0x20;
418 messageBufferIndex++;
419 } while (messageBufferIndex < messageBufferLength);
424 logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message {}", message[0]);
425 } catch (EOFException e) {
427 } catch (IOException e) {
428 } catch (InterruptedException e) {
429 logger.trace("InterruptedException caught.");
430 Thread.currentThread().interrupt();
433 // Initialize state for a new reception
435 messageBufferLength = 0;
436 messageBufferIndex = 0;
440 private void receiveInAsciiProtocol(SerialPortEvent serialPortEvent) {
442 // Read the start byte
444 loopUntilByteIsRead(in, 0x0a);
447 haveFirstByte = false;
448 messageBufferLength = 0;
450 logger.trace("CaddxCommunicator.handleAsciiProtocol() Got start byte");
452 // Read the message length
453 if (messageBufferLength == 0) {
454 int b = readAsciiByte(in);
455 messageBufferLength = b + 2; // add 2 bytes for the checksum
456 message = new byte[messageBufferLength];
458 logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message length {}", messageBufferLength);
462 int b = readAsciiByte(in);
463 message[messageBufferIndex] = (byte) b;
464 messageBufferIndex++;
465 } while (messageBufferIndex < messageBufferLength);
470 logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message {}", message[0]);
471 } catch (EOFException e) {
473 } catch (IOException e) {
474 } catch (InterruptedException e) {
475 logger.trace("InterruptedException caught.");
476 Thread.currentThread().interrupt();
479 // Initialize state for a new reception
481 messageBufferLength = 0;
482 messageBufferIndex = 0;