]> git.basschouten.com Git - openhab-addons.git/blob
4dea3886a5cd58d66ee2949f4d6fa4a86d8150be
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.caddx.internal;
14
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Deque;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.LinkedBlockingDeque;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.stream.IntStream;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.core.io.transport.serial.PortInUseException;
31 import org.openhab.core.io.transport.serial.SerialPort;
32 import org.openhab.core.io.transport.serial.SerialPortEvent;
33 import org.openhab.core.io.transport.serial.SerialPortEventListener;
34 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
35 import org.openhab.core.io.transport.serial.SerialPortManager;
36 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
37 import org.openhab.core.util.HexUtils;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * The {@link CaddxCommunicator} is responsible for the asynchronous serial communication
43  *
44  * @author Georgios Moutsos - Initial contribution
45  */
46 @NonNullByDefault
47 public class CaddxCommunicator implements SerialPortEventListener {
48     private final Logger logger = LoggerFactory.getLogger(CaddxCommunicator.class);
49
50     private final SerialPortManager portManager;
51     private final Set<CaddxPanelListener> listenerQueue = new HashSet<>();
52     private final Deque<CaddxMessage> messages = new LinkedBlockingDeque<>();
53     private final SynchronousQueue<CaddxMessage> exchanger = new SynchronousQueue<>();
54     private final Thread communicator;
55     private final CaddxProtocol protocol;
56     private final String serialPortName;
57     private final int baudRate;
58     private final SerialPort serialPort;
59     private final InputStream in;
60     private final OutputStream out;
61
62     // Receiver state variables
63     private boolean inMessage = false;
64     private boolean haveFirstByte = false;
65     private int messageBufferLength = 0;
66     private byte[] message;
67     private int messageBufferIndex = 0;
68     private boolean unStuff = false;
69     private int tempAsciiByte = 0;
70
71     public CaddxCommunicator(SerialPortManager portManager, CaddxProtocol protocol, String serialPortName, int baudRate)
72             throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
73         this.portManager = portManager;
74         this.protocol = protocol;
75         this.serialPortName = serialPortName;
76         this.baudRate = baudRate;
77
78         SerialPortIdentifier portIdentifier = this.portManager.getIdentifier(serialPortName);
79         if (portIdentifier == null) {
80             throw new IOException("Cannot get the port identifier.");
81         }
82         serialPort = portIdentifier.open(this.getClass().getName(), 2000);
83         serialPort.setSerialPortParams(baudRate, SerialPort.DATABITS_8, SerialPort.STOPBITS_1, SerialPort.PARITY_NONE);
84         serialPort.enableReceiveThreshold(1);
85         serialPort.disableReceiveTimeout();
86
87         InputStream localIn = serialPort.getInputStream();
88         if (localIn == null) {
89             logger.warn("Cannot get the input stream of the serial port");
90             throw new IOException("Input stream is null");
91         }
92         in = localIn;
93
94         OutputStream localOut = serialPort.getOutputStream();
95         if (localOut == null) {
96             logger.warn("Cannot get the output stream of the serial port");
97             throw new IOException("Output stream is null");
98         }
99         out = localOut;
100
101         serialPort.notifyOnDataAvailable(true);
102         serialPort.addEventListener(this);
103
104         communicator = new Thread(this::messageDispatchLoop, "Caddx Communicator");
105         communicator.setDaemon(true);
106         communicator.start();
107
108         message = new byte[0];
109
110         logger.trace("CaddxCommunicator communication thread started successfully for {}", serialPortName);
111     }
112
113     public CaddxProtocol getProtocol() {
114         return protocol;
115     }
116
117     public String getSerialPortName() {
118         return serialPortName;
119     }
120
121     public int getBaudRate() {
122         return baudRate;
123     }
124
125     public void addListener(CaddxPanelListener listener) {
126         listenerQueue.add(listener);
127     }
128
129     /**
130      * Send message to panel. Asynchronous, i.e. returns immediately.
131      * Messages are sent only when panel is ready (i.e. sent an
132      * acknowledgment to last message), but no checks are implemented that
133      * the message was correctly received and executed.
134      *
135      * @param msg Data to be sent to panel. First byte is message type.
136      *            Fletcher sum is computed and appended by transmit.
137      */
138     public void transmit(CaddxMessage msg) {
139         messages.add(msg);
140     }
141
142     /**
143      * Adds this message before any others in the queue.
144      * Used by receiver to send ACKs.
145      *
146      * @param msg The message
147      */
148     public void transmitFirst(CaddxMessage msg) {
149         messages.addFirst(msg);
150     }
151
152     public void stop() {
153         logger.trace("CaddxCommunicator stopping");
154
155         // kick thread out of waiting for FIFO
156         communicator.interrupt();
157
158         // Close the streams first to unblock blocked reads and writes
159         try {
160             in.close();
161         } catch (IOException e) {
162         }
163         try {
164             out.close();
165         } catch (IOException e) {
166         }
167
168         // Wait until communication thread exits
169         try {
170             communicator.join(3000);
171         } catch (InterruptedException e) {
172         }
173
174         // Also close the serial port
175         serialPort.removeEventListener();
176         serialPort.close();
177     }
178
179     @SuppressWarnings("null")
180     private void messageDispatchLoop() {
181         int @Nullable [] expectedMessageNumbers = null;
182
183         @Nullable
184         CaddxMessage outgoingMessage = null;
185         boolean skipTransmit = true;
186
187         try {
188             // loop until the thread is interrupted, sending out messages
189             while (!Thread.currentThread().isInterrupted()) {
190                 // Initialize the state
191                 outgoingMessage = null;
192                 expectedMessageNumbers = null;
193
194                 if (!skipTransmit) {
195                     // send next outgoing message if we have one
196                     outgoingMessage = messages.poll();
197                     if (outgoingMessage != null) {
198                         logger.trace("CaddxCommunicator.run() Outgoing message: {}", outgoingMessage.getMessageType());
199
200                         byte[] msg = outgoingMessage.getMessageFrameBytes(protocol);
201                         out.write(msg);
202                         out.flush();
203
204                         expectedMessageNumbers = outgoingMessage.getReplyMessageNumbers();
205
206                         // Log message
207                         if (logger.isDebugEnabled()) {
208                             logger.debug("->: {}", outgoingMessage.getName());
209                             logger.debug("->: {}", HexUtils
210                                     .bytesToHex(outgoingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
211                         }
212                     }
213                 } else {
214                     logger.trace("CaddxCommunicator.run() skipTransmit: true");
215                     skipTransmit = false;
216                 }
217
218                 // Check for an incoming message
219                 CaddxMessage incomingMessage = null;
220                 try {
221                     incomingMessage = exchanger.poll(3, TimeUnit.SECONDS);
222                 } catch (InterruptedException e) {
223                     logger.debug("CaddxCommunicator.run() InterruptedException caught.");
224                     Thread.currentThread().interrupt();
225                 }
226
227                 // Log
228                 if (incomingMessage == null) {
229                     if (expectedMessageNumbers == null) { // Nothing expected, Nothing received we continue
230                         logger.trace("CaddxCommunicator.run(): Nothing expected, Nothing received we continue");
231                         continue;
232                     }
233                 } else {
234                     if (logger.isDebugEnabled()) {
235                         logger.debug("<-: {}", incomingMessage.getName());
236                         logger.debug("<-: {}",
237                                 HexUtils.bytesToHex(incomingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
238                     }
239                 }
240
241                 // Check if we wait for a reply
242                 if (expectedMessageNumbers == null) {
243                     if (incomingMessage != null) { // Nothing expected. Message received.
244                         logger.trace("CaddxCommunicator.run() Nothing expected, Message received");
245
246                         // Check if Acknowledgement handling is required.
247                         if (incomingMessage.hasAcknowledgementFlag()) {
248                             if (incomingMessage.isChecksumCorrect()) {
249                                 // send ACK
250                                 transmitFirst(new CaddxMessage(CaddxMessageType.POSITIVE_ACKNOWLEDGE, ""));
251                             } else {
252                                 // Send NAK
253                                 transmitFirst(new CaddxMessage(CaddxMessageType.NEGATIVE_ACKNOWLEDGE, ""));
254                             }
255                         }
256                     }
257                 } else {
258                     if (incomingMessage == null) {
259                         logger.trace("CaddxCommunicator.run() Message expected. Nothing received");
260
261                         // Message expected. Nothing received
262                         if (outgoingMessage != null) {
263                             transmitFirst(outgoingMessage); // put message in queue again
264                             continue;
265                         }
266                     } else {
267                         logger.trace("CaddxCommunicator.run() Message expected. Message received");
268
269                         // Message expected. Message received.
270                         int receivedMessageType = incomingMessage.getMessageType();
271                         boolean isMessageExpected = IntStream.of(expectedMessageNumbers)
272                                 .anyMatch(x -> x == receivedMessageType);
273
274                         if (!isMessageExpected) {
275                             logger.trace("Non expected message received exp:{}, recv: {}", expectedMessageNumbers,
276                                     receivedMessageType);
277
278                             // Non expected reply received
279                             if (outgoingMessage != null) {
280                                 transmitFirst(outgoingMessage); // put message in queue again
281                                 skipTransmit = true; // Skip the transmit on the next cycle to receive the panel message
282                             }
283                         }
284                     }
285                 }
286
287                 // Inform the listeners
288                 if (incomingMessage != null) {
289                     if (incomingMessage.isChecksumCorrect()) {
290                         for (CaddxPanelListener listener : listenerQueue) {
291                             listener.caddxMessage(this, incomingMessage);
292                         }
293                     } else {
294                         logger.warn(
295                                 "CaddxCommunicator.run() Received packet checksum does not match. in: {} {}, calc {} {}",
296                                 incomingMessage.getChecksum1In(), incomingMessage.getChecksum2In(),
297                                 incomingMessage.getChecksum1Calc(), incomingMessage.getChecksum2Calc());
298                     }
299                 }
300             }
301         } catch (IOException e) {
302             logger.debug("CaddxCommunicator.run() IOException. Stopping sender thread. {}", getSerialPortName());
303             Thread.currentThread().interrupt();
304         }
305
306         logger.warn("CaddxCommunicator.run() Sender thread stopped. {}", getSerialPortName());
307     }
308
309     /**
310      * Event handler to receive the data from the serial port
311      *
312      * @param SerialPortEvent serialPortEvent The event that occurred on the serial port
313      */
314     @Override
315     public void serialEvent(@Nullable SerialPortEvent serialPortEvent) {
316         if (serialPortEvent == null) {
317             return;
318         }
319
320         if (serialPortEvent.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
321             logger.trace("Data receiving from the serial port");
322             if (protocol == CaddxProtocol.Binary) {
323                 receiveInBinaryProtocol(serialPortEvent);
324             } else {
325                 receiveInAsciiProtocol(serialPortEvent);
326             }
327         }
328     }
329
330     private int readByte(InputStream stream) throws IOException {
331         int b = -1;
332
333         if (stream.available() > 0) {
334             b = stream.read();
335         }
336         if (b == -1) {
337             throw new EOFException();
338         }
339         return b;
340     }
341
342     private int readAsciiByte(InputStream stream) throws IOException {
343         if (!haveFirstByte) { // this is the 1st digit
344             int b = readByte(in);
345             tempAsciiByte = (b >= 0x30 && b <= 0x39) ? (b - 0x30) * 0x10 : (b - 0x37) * 0x10;
346             haveFirstByte = true;
347         }
348
349         if (haveFirstByte) { // this is the 2nd digit
350             int b = readByte(in);
351             tempAsciiByte += (b >= 0x30 && b <= 0x39) ? (b - 0x30) : (b - 0x37);
352             haveFirstByte = false;
353         }
354
355         return tempAsciiByte;
356     }
357
358     private void loopUntilByteIsRead(InputStream stream, int byteToRead) throws IOException {
359         int b = 0;
360         do {
361             b = readByte(in);
362         } while (b != byteToRead);
363     }
364
365     private void offerCaddxMessage() throws InterruptedException {
366         logger.trace("Offering received message");
367
368         // Full message received in data byte array
369         CaddxMessage caddxMessage = new CaddxMessage(message, true);
370         if (!exchanger.offer(caddxMessage, 3, TimeUnit.SECONDS)) {
371             logger.debug("Offered message was not received");
372         }
373     }
374
375     private void receiveInBinaryProtocol(SerialPortEvent serialPortEvent) {
376         try {
377             // Read the start byte
378             if (!inMessage) // skip until 0x7E
379             {
380                 loopUntilByteIsRead(in, 0x7e);
381
382                 inMessage = true;
383                 messageBufferLength = 0;
384             }
385             logger.trace("CaddxCommunicator.handleBinaryProtocol() Got start byte");
386
387             // Read the message length
388             if (messageBufferLength == 0) {
389                 int b = readByte(in);
390                 messageBufferLength = b + 2; // add two bytes for the checksum
391                 message = new byte[messageBufferLength];
392                 messageBufferIndex = 0;
393             }
394             logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message length {}", messageBufferLength);
395
396             // Read the message
397             do {
398                 int b = readByte(in);
399                 message[messageBufferIndex] = (byte) b;
400
401                 if (message[messageBufferIndex] == 0x7D) {
402                     unStuff = true;
403                     continue;
404                 }
405
406                 if (unStuff) {
407                     message[messageBufferIndex] |= 0x20;
408                     unStuff = false;
409                 }
410
411                 messageBufferIndex++;
412             } while (messageBufferIndex < messageBufferLength);
413
414             // Offer the message
415             offerCaddxMessage();
416
417             logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message {}", message[0]);
418         } catch (EOFException e) {
419             return;
420         } catch (IOException e) {
421         } catch (InterruptedException e) {
422             logger.trace("InterruptedException caught.");
423             Thread.currentThread().interrupt();
424         }
425
426         // Initialize state for a new reception
427         inMessage = false;
428         messageBufferLength = 0;
429         messageBufferIndex = 0;
430         unStuff = false;
431     }
432
433     private void receiveInAsciiProtocol(SerialPortEvent serialPortEvent) {
434         try {
435             // Read the start byte
436             if (!inMessage) {
437                 loopUntilByteIsRead(in, 0x0a);
438
439                 inMessage = true;
440                 haveFirstByte = false;
441                 messageBufferLength = 0;
442             }
443             logger.trace("CaddxCommunicator.handleAsciiProtocol() Got start byte");
444
445             // Read the message length
446             if (messageBufferLength == 0) {
447                 int b = readAsciiByte(in);
448                 messageBufferLength = b + 2; // add 2 bytes for the checksum
449                 message = new byte[messageBufferLength];
450             }
451             logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message length {}", messageBufferLength);
452
453             // Read the message
454             do {
455                 int b = readAsciiByte(in);
456                 message[messageBufferIndex] = (byte) b;
457                 messageBufferIndex++;
458             } while (messageBufferIndex < messageBufferLength);
459
460             // Offer the message
461             offerCaddxMessage();
462
463             logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message {}", message[0]);
464         } catch (EOFException e) {
465             return;
466         } catch (IOException e) {
467         } catch (InterruptedException e) {
468             logger.trace("InterruptedException caught.");
469             Thread.currentThread().interrupt();
470         }
471
472         // Initialize state for a new reception
473         inMessage = false;
474         messageBufferLength = 0;
475         messageBufferIndex = 0;
476     }
477 }