]> git.basschouten.com Git - openhab-addons.git/blob
c06204c20f510b82aef8e28ada3c3604b061ef14
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.caddx.internal;
14
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Deque;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.LinkedBlockingDeque;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.stream.IntStream;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.core.io.transport.serial.PortInUseException;
31 import org.openhab.core.io.transport.serial.SerialPort;
32 import org.openhab.core.io.transport.serial.SerialPortEvent;
33 import org.openhab.core.io.transport.serial.SerialPortEventListener;
34 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
35 import org.openhab.core.io.transport.serial.SerialPortManager;
36 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
37 import org.openhab.core.util.HexUtils;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * The {@link CaddxCommunicator} is responsible for the asynchronous serial communication
43  *
44  * @author Georgios Moutsos - Initial contribution
45  */
46 @NonNullByDefault
47 public class CaddxCommunicator implements SerialPortEventListener {
48     private final Logger logger = LoggerFactory.getLogger(CaddxCommunicator.class);
49
50     private final SerialPortManager portManager;
51     private final Set<CaddxPanelListener> listenerQueue = new HashSet<>();
52     private final Deque<CaddxMessage> messages = new LinkedBlockingDeque<>();
53     private final SynchronousQueue<CaddxMessage> exchanger = new SynchronousQueue<>();
54     private final Thread communicator;
55     private final CaddxProtocol protocol;
56     private final String serialPortName;
57     private final int baudRate;
58     private final SerialPort serialPort;
59     private final InputStream in;
60     private final OutputStream out;
61
62     // Receiver state variables
63     private boolean inMessage = false;
64     private boolean haveFirstByte = false;
65     private int messageBufferLength = 0;
66     private byte[] message;
67     private int messageBufferIndex = 0;
68     private boolean unStuff = false;
69     private int tempAsciiByte = 0;
70
71     public CaddxCommunicator(String uid, SerialPortManager portManager, CaddxProtocol protocol, String serialPortName,
72             int baudRate)
73             throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
74         this.portManager = portManager;
75         this.protocol = protocol;
76         this.serialPortName = serialPortName;
77         this.baudRate = baudRate;
78
79         SerialPortIdentifier portIdentifier = this.portManager.getIdentifier(serialPortName);
80         if (portIdentifier == null) {
81             throw new IOException("Cannot get the port identifier.");
82         }
83         serialPort = portIdentifier.open(this.getClass().getName(), 2000);
84         serialPort.setSerialPortParams(baudRate, SerialPort.DATABITS_8, SerialPort.STOPBITS_1, SerialPort.PARITY_NONE);
85         serialPort.enableReceiveThreshold(1);
86         serialPort.disableReceiveTimeout();
87
88         InputStream localIn = serialPort.getInputStream();
89         if (localIn == null) {
90             logger.warn("Cannot get the input stream of the serial port");
91             throw new IOException("Input stream is null");
92         }
93         in = localIn;
94
95         OutputStream localOut = serialPort.getOutputStream();
96         if (localOut == null) {
97             logger.warn("Cannot get the output stream of the serial port");
98             throw new IOException("Output stream is null");
99         }
100         out = localOut;
101
102         serialPort.notifyOnDataAvailable(true);
103         serialPort.addEventListener(this);
104
105         communicator = new Thread(this::messageDispatchLoop, "OH-binding-" + uid + "-caddxCommunicator");
106         communicator.setDaemon(true);
107         communicator.start();
108
109         message = new byte[0];
110
111         logger.trace("CaddxCommunicator communication thread started successfully for {}", serialPortName);
112     }
113
114     public CaddxProtocol getProtocol() {
115         return protocol;
116     }
117
118     public String getSerialPortName() {
119         return serialPortName;
120     }
121
122     public int getBaudRate() {
123         return baudRate;
124     }
125
126     public void addListener(CaddxPanelListener listener) {
127         listenerQueue.add(listener);
128     }
129
130     /**
131      * Send message to panel. Asynchronous, i.e. returns immediately.
132      * Messages are sent only when panel is ready (i.e. sent an
133      * acknowledgment to last message), but no checks are implemented that
134      * the message was correctly received and executed.
135      *
136      * @param msg Data to be sent to panel. First byte is message type.
137      *            Fletcher sum is computed and appended by transmit.
138      */
139     public void transmit(CaddxMessage msg) {
140         messages.add(msg);
141     }
142
143     /**
144      * Adds this message before any others in the queue.
145      * Used by receiver to send ACKs.
146      *
147      * @param msg The message
148      */
149     public void transmitFirst(CaddxMessage msg) {
150         messages.addFirst(msg);
151     }
152
153     public void stop() {
154         logger.trace("CaddxCommunicator stopping");
155
156         // kick thread out of waiting for FIFO
157         communicator.interrupt();
158
159         // Close the streams first to unblock blocked reads and writes
160         try {
161             in.close();
162         } catch (IOException e) {
163         }
164         try {
165             out.close();
166         } catch (IOException e) {
167         }
168
169         // Wait until communication thread exits
170         try {
171             communicator.join(3000);
172         } catch (InterruptedException e) {
173         }
174
175         // Also close the serial port
176         serialPort.removeEventListener();
177         serialPort.close();
178     }
179
180     @SuppressWarnings("null")
181     private void messageDispatchLoop() {
182         int @Nullable [] expectedMessageNumbers = null;
183
184         @Nullable
185         CaddxMessage outgoingMessage = null;
186         boolean skipTransmit = true;
187
188         try {
189             // loop until the thread is interrupted, sending out messages
190             while (!Thread.currentThread().isInterrupted()) {
191                 // Initialize the state
192                 outgoingMessage = null;
193                 expectedMessageNumbers = null;
194
195                 if (!skipTransmit) {
196                     // send next outgoing message if we have one
197                     outgoingMessage = messages.poll();
198                     if (outgoingMessage != null) {
199                         logger.trace("CaddxCommunicator.run() Outgoing message: {}", outgoingMessage.getMessageType());
200
201                         byte[] msg = outgoingMessage.getMessageFrameBytes(protocol);
202                         out.write(msg);
203                         out.flush();
204
205                         expectedMessageNumbers = outgoingMessage.getReplyMessageNumbers();
206
207                         // Log message
208                         if (logger.isDebugEnabled()) {
209                             logger.debug("->: {}", outgoingMessage.getName());
210                             logger.debug("->: {}", HexUtils
211                                     .bytesToHex(outgoingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
212                         }
213                     }
214                 } else {
215                     logger.trace("CaddxCommunicator.run() skipTransmit: true");
216                     skipTransmit = false;
217                 }
218
219                 // Check for an incoming message
220                 CaddxMessage incomingMessage = null;
221                 try {
222                     incomingMessage = exchanger.poll(3, TimeUnit.SECONDS);
223                 } catch (InterruptedException e) {
224                     logger.debug("CaddxCommunicator.run() InterruptedException caught.");
225                     Thread.currentThread().interrupt();
226                 }
227
228                 // Log
229                 if (incomingMessage == null) {
230                     if (expectedMessageNumbers == null) { // Nothing expected, Nothing received we continue
231                         logger.trace("CaddxCommunicator.run(): Nothing expected, Nothing received we continue");
232                         continue;
233                     }
234                 } else {
235                     if (logger.isDebugEnabled()) {
236                         logger.debug("<-: {}", incomingMessage.getName());
237                         logger.debug("<-: {}",
238                                 HexUtils.bytesToHex(incomingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
239                     }
240                 }
241
242                 // Check if we wait for a reply
243                 if (expectedMessageNumbers == null) {
244                     if (incomingMessage != null) { // Nothing expected. Message received.
245                         logger.trace("CaddxCommunicator.run() Nothing expected, Message received");
246
247                         // Check if Acknowledgement handling is required.
248                         if (incomingMessage.hasAcknowledgementFlag()) {
249                             if (incomingMessage.isChecksumCorrect()) {
250                                 // send ACK
251                                 transmitFirst(new CaddxMessage(CaddxMessageType.POSITIVE_ACKNOWLEDGE, ""));
252                             } else {
253                                 // Send NAK
254                                 transmitFirst(new CaddxMessage(CaddxMessageType.NEGATIVE_ACKNOWLEDGE, ""));
255                             }
256                         }
257                     }
258                 } else {
259                     if (incomingMessage == null) {
260                         logger.trace("CaddxCommunicator.run() Message expected. Nothing received");
261
262                         // Message expected. Nothing received
263                         if (outgoingMessage != null) {
264                             transmitFirst(outgoingMessage); // put message in queue again
265                             continue;
266                         }
267                     } else {
268                         logger.trace("CaddxCommunicator.run() Message expected. Message received");
269
270                         // Message expected. Message received.
271                         int receivedMessageType = incomingMessage.getMessageType();
272                         boolean isMessageExpected = IntStream.of(expectedMessageNumbers)
273                                 .anyMatch(x -> x == receivedMessageType);
274
275                         if (!isMessageExpected) {
276                             logger.trace("Non expected message received exp:{}, recv: {}", expectedMessageNumbers,
277                                     receivedMessageType);
278
279                             // Non expected reply received
280                             if (outgoingMessage != null) {
281                                 transmitFirst(outgoingMessage); // put message in queue again
282                                 skipTransmit = true; // Skip the transmit on the next cycle to receive the panel message
283                             }
284                         }
285                     }
286                 }
287
288                 // Inform the listeners
289                 if (incomingMessage != null) {
290                     if (incomingMessage.isChecksumCorrect()) {
291                         for (CaddxPanelListener listener : listenerQueue) {
292                             listener.caddxMessage(this, incomingMessage);
293                         }
294                     } else {
295                         logger.warn(
296                                 "CaddxCommunicator.run() Received packet checksum does not match. in: {} {}, calc {} {}",
297                                 incomingMessage.getChecksum1In(), incomingMessage.getChecksum2In(),
298                                 incomingMessage.getChecksum1Calc(), incomingMessage.getChecksum2Calc());
299                     }
300                 }
301             }
302         } catch (IOException e) {
303             logger.debug("CaddxCommunicator.run() IOException. Stopping sender thread. {}", getSerialPortName());
304             Thread.currentThread().interrupt();
305         }
306
307         logger.warn("CaddxCommunicator.run() Sender thread stopped. {}", getSerialPortName());
308     }
309
310     /**
311      * Event handler to receive the data from the serial port
312      *
313      * @param SerialPortEvent serialPortEvent The event that occurred on the serial port
314      */
315     @Override
316     public void serialEvent(@Nullable SerialPortEvent serialPortEvent) {
317         if (serialPortEvent == null) {
318             return;
319         }
320
321         if (serialPortEvent.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
322             logger.trace("Data receiving from the serial port");
323             if (protocol == CaddxProtocol.Binary) {
324                 receiveInBinaryProtocol(serialPortEvent);
325             } else {
326                 receiveInAsciiProtocol(serialPortEvent);
327             }
328         }
329     }
330
331     private int readByte(InputStream stream) throws IOException {
332         int b = -1;
333
334         if (stream.available() > 0) {
335             b = stream.read();
336         }
337         if (b == -1) {
338             throw new EOFException();
339         }
340         return b;
341     }
342
343     private int readAsciiByte(InputStream stream) throws IOException {
344         if (!haveFirstByte) { // this is the 1st digit
345             int b = readByte(in);
346             tempAsciiByte = (b >= 0x30 && b <= 0x39) ? (b - 0x30) * 0x10 : (b - 0x37) * 0x10;
347             haveFirstByte = true;
348         }
349
350         if (haveFirstByte) { // this is the 2nd digit
351             int b = readByte(in);
352             tempAsciiByte += (b >= 0x30 && b <= 0x39) ? (b - 0x30) : (b - 0x37);
353             haveFirstByte = false;
354         }
355
356         return tempAsciiByte;
357     }
358
359     private void loopUntilByteIsRead(InputStream stream, int byteToRead) throws IOException {
360         int b = 0;
361         do {
362             b = readByte(in);
363         } while (b != byteToRead);
364     }
365
366     private void offerCaddxMessage() throws InterruptedException {
367         logger.trace("Offering received message");
368
369         // Full message received in data byte array
370         CaddxMessage caddxMessage = new CaddxMessage(message, true);
371         if (!exchanger.offer(caddxMessage, 3, TimeUnit.SECONDS)) {
372             logger.debug("Offered message was not received");
373         }
374     }
375
376     private void receiveInBinaryProtocol(SerialPortEvent serialPortEvent) {
377         try {
378             // Read the start byte
379             if (!inMessage) // skip until 0x7E
380             {
381                 loopUntilByteIsRead(in, 0x7e);
382
383                 inMessage = true;
384                 messageBufferLength = 0;
385             }
386             logger.trace("CaddxCommunicator.handleBinaryProtocol() Got start byte");
387
388             // Read the message length
389             if (messageBufferLength == 0) {
390                 int b = readByte(in);
391                 messageBufferLength = b + 2; // add two bytes for the checksum
392                 message = new byte[messageBufferLength];
393                 messageBufferIndex = 0;
394             }
395             logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message length {}", messageBufferLength);
396
397             // Read the message
398             do {
399                 int b = readByte(in);
400                 message[messageBufferIndex] = (byte) b;
401
402                 if (message[messageBufferIndex] == 0x7D) {
403                     unStuff = true;
404                     continue;
405                 }
406
407                 if (unStuff) {
408                     message[messageBufferIndex] |= 0x20;
409                     unStuff = false;
410                 }
411
412                 messageBufferIndex++;
413             } while (messageBufferIndex < messageBufferLength);
414
415             // Offer the message
416             offerCaddxMessage();
417
418             logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message {}", message[0]);
419         } catch (EOFException e) {
420             return;
421         } catch (IOException e) {
422         } catch (InterruptedException e) {
423             logger.trace("InterruptedException caught.");
424             Thread.currentThread().interrupt();
425         }
426
427         // Initialize state for a new reception
428         inMessage = false;
429         messageBufferLength = 0;
430         messageBufferIndex = 0;
431         unStuff = false;
432     }
433
434     private void receiveInAsciiProtocol(SerialPortEvent serialPortEvent) {
435         try {
436             // Read the start byte
437             if (!inMessage) {
438                 loopUntilByteIsRead(in, 0x0a);
439
440                 inMessage = true;
441                 haveFirstByte = false;
442                 messageBufferLength = 0;
443             }
444             logger.trace("CaddxCommunicator.handleAsciiProtocol() Got start byte");
445
446             // Read the message length
447             if (messageBufferLength == 0) {
448                 int b = readAsciiByte(in);
449                 messageBufferLength = b + 2; // add 2 bytes for the checksum
450                 message = new byte[messageBufferLength];
451             }
452             logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message length {}", messageBufferLength);
453
454             // Read the message
455             do {
456                 int b = readAsciiByte(in);
457                 message[messageBufferIndex] = (byte) b;
458                 messageBufferIndex++;
459             } while (messageBufferIndex < messageBufferLength);
460
461             // Offer the message
462             offerCaddxMessage();
463
464             logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message {}", message[0]);
465         } catch (EOFException e) {
466             return;
467         } catch (IOException e) {
468         } catch (InterruptedException e) {
469             logger.trace("InterruptedException caught.");
470             Thread.currentThread().interrupt();
471         }
472
473         // Initialize state for a new reception
474         inMessage = false;
475         messageBufferLength = 0;
476         messageBufferIndex = 0;
477     }
478 }