]> git.basschouten.com Git - openhab-addons.git/blob
fd9b9253793cd869ce996e9459f03afecec207d4
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.caddx.internal;
14
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Deque;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.LinkedBlockingDeque;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.stream.IntStream;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.core.io.transport.serial.PortInUseException;
31 import org.openhab.core.io.transport.serial.SerialPort;
32 import org.openhab.core.io.transport.serial.SerialPortEvent;
33 import org.openhab.core.io.transport.serial.SerialPortEventListener;
34 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
35 import org.openhab.core.io.transport.serial.SerialPortManager;
36 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
37 import org.openhab.core.util.HexUtils;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * The {@link CaddxCommunicator} is responsible for the asynchronous serial communication
43  *
44  * @author Georgios Moutsos - Initial contribution
45  */
46 @NonNullByDefault
47 public class CaddxCommunicator implements SerialPortEventListener {
48     private final Logger logger = LoggerFactory.getLogger(CaddxCommunicator.class);
49
50     private final SerialPortManager portManager;
51     private final Set<CaddxPanelListener> listenerQueue = new HashSet<>();
52     private final Deque<CaddxMessage> messages = new LinkedBlockingDeque<>();
53     private final SynchronousQueue<CaddxMessage> exchanger = new SynchronousQueue<>();
54     private final Thread communicator;
55     private final CaddxProtocol protocol;
56     private final String serialPortName;
57     private final int baudRate;
58     private final SerialPort serialPort;
59     private final InputStream in;
60     private final OutputStream out;
61
62     // Receiver state variables
63     private boolean inMessage = false;
64     private boolean haveFirstByte = false;
65     private int messageBufferLength = 0;
66     private byte[] message;
67     private int messageBufferIndex = 0;
68     private boolean unStuff = false;
69     private int tempAsciiByte = 0;
70
71     public CaddxCommunicator(String uid, SerialPortManager portManager, CaddxProtocol protocol, String serialPortName,
72             int baudRate)
73             throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
74         this.portManager = portManager;
75         this.protocol = protocol;
76         this.serialPortName = serialPortName;
77         this.baudRate = baudRate;
78
79         SerialPortIdentifier portIdentifier = this.portManager.getIdentifier(serialPortName);
80         if (portIdentifier == null) {
81             throw new IOException("Cannot get the port identifier.");
82         }
83         serialPort = portIdentifier.open(this.getClass().getName(), 2000);
84         serialPort.setSerialPortParams(baudRate, SerialPort.DATABITS_8, SerialPort.STOPBITS_1, SerialPort.PARITY_NONE);
85         serialPort.enableReceiveThreshold(1);
86         serialPort.disableReceiveTimeout();
87
88         InputStream localIn = serialPort.getInputStream();
89         if (localIn == null) {
90             logger.warn("Cannot get the input stream of the serial port");
91             throw new IOException("Input stream is null");
92         }
93         in = localIn;
94
95         OutputStream localOut = serialPort.getOutputStream();
96         if (localOut == null) {
97             logger.warn("Cannot get the output stream of the serial port");
98             throw new IOException("Output stream is null");
99         }
100         out = localOut;
101
102         serialPort.notifyOnDataAvailable(true);
103         serialPort.addEventListener(this);
104
105         communicator = new Thread(this::messageDispatchLoop, "OH-binding-" + uid + "-caddxCommunicator");
106         communicator.setDaemon(true);
107         communicator.start();
108
109         message = new byte[0];
110
111         logger.trace("CaddxCommunicator communication thread started successfully for {}", serialPortName);
112     }
113
114     public CaddxProtocol getProtocol() {
115         return protocol;
116     }
117
118     public String getSerialPortName() {
119         return serialPortName;
120     }
121
122     public int getBaudRate() {
123         return baudRate;
124     }
125
126     public void addListener(CaddxPanelListener listener) {
127         listenerQueue.add(listener);
128     }
129
130     /**
131      * Send message to panel. Asynchronous, i.e. returns immediately.
132      * Messages are sent only when panel is ready (i.e. sent an
133      * acknowledgment to last message), but no checks are implemented that
134      * the message was correctly received and executed.
135      *
136      * @param msg Data to be sent to panel. First byte is message type.
137      *            Fletcher sum is computed and appended by transmit.
138      */
139     public void transmit(CaddxMessage msg) {
140         messages.add(msg);
141     }
142
143     /**
144      * Adds this message before any others in the queue.
145      * Used by receiver to send ACKs.
146      *
147      * @param msg The message
148      */
149     public void transmitFirst(CaddxMessage msg) {
150         messages.addFirst(msg);
151     }
152
153     public void stop() {
154         logger.trace("CaddxCommunicator stopping");
155
156         // kick thread out of waiting for FIFO
157         communicator.interrupt();
158
159         // Close the streams first to unblock blocked reads and writes
160         try {
161             in.close();
162         } catch (IOException e) {
163         }
164         try {
165             out.close();
166         } catch (IOException e) {
167         }
168
169         // Wait until communication thread exits
170         try {
171             communicator.join(3000);
172         } catch (InterruptedException e) {
173         }
174
175         // Also close the serial port
176         serialPort.removeEventListener();
177         serialPort.close();
178     }
179
180     @SuppressWarnings("null")
181     private void messageDispatchLoop() {
182         int[] expectedMessageNumbers = null;
183         CaddxMessage outgoingMessage = null;
184         boolean skipTransmit = true;
185         CaddxMessageContext context = null;
186
187         try {
188             // loop until the thread is interrupted, sending out messages
189             while (!Thread.currentThread().isInterrupted()) {
190                 // Initialize the state
191                 outgoingMessage = null;
192                 context = null;
193                 expectedMessageNumbers = null;
194
195                 if (!skipTransmit) {
196                     // send next outgoing message if we have one
197                     outgoingMessage = messages.poll();
198                     if (outgoingMessage != null) {
199                         logger.trace("CaddxCommunicator.run() Outgoing message: {}", outgoingMessage.getMessageType());
200
201                         byte[] msg = outgoingMessage.getMessageFrameBytes(protocol);
202                         out.write(msg);
203                         out.flush();
204
205                         expectedMessageNumbers = outgoingMessage.getReplyMessageNumbers();
206                         context = outgoingMessage.getContext();
207
208                         // Log message
209                         if (logger.isDebugEnabled()) {
210                             logger.debug("->: {}", outgoingMessage.getName());
211                             logger.debug("->: {}", HexUtils
212                                     .bytesToHex(outgoingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
213                         }
214                     }
215                 } else {
216                     logger.trace("CaddxCommunicator.run() skipTransmit: true");
217                     skipTransmit = false;
218                 }
219
220                 // Check for an incoming message
221                 CaddxMessage incomingMessage = null;
222                 try {
223                     incomingMessage = exchanger.poll(3, TimeUnit.SECONDS);
224                 } catch (InterruptedException e) {
225                     logger.debug("CaddxCommunicator.run() InterruptedException caught.");
226                     Thread.currentThread().interrupt();
227                 }
228
229                 // Log
230                 if (incomingMessage == null) {
231                     if (expectedMessageNumbers == null) { // Nothing expected, Nothing received we continue
232                         logger.trace("CaddxCommunicator.run(): Nothing expected, Nothing received we continue");
233                         continue;
234                     }
235                 } else {
236                     if (logger.isDebugEnabled()) {
237                         logger.debug("<-: {}", incomingMessage.getName());
238                         logger.debug("<-: {}",
239                                 HexUtils.bytesToHex(incomingMessage.getMessageFrameBytes(CaddxProtocol.Binary), " "));
240                     }
241                 }
242
243                 // Check if we wait for a reply
244                 if (expectedMessageNumbers == null) {
245                     if (incomingMessage != null) { // Nothing expected. Message received.
246                         logger.trace("CaddxCommunicator.run() Nothing expected, Message received");
247
248                         // Check if Acknowledgement handling is required.
249                         if (incomingMessage.hasAcknowledgementFlag()) {
250                             if (incomingMessage.isChecksumCorrect()) {
251                                 // send ACK
252                                 transmitFirst(new CaddxMessage(CaddxMessageContext.NONE,
253                                         CaddxMessageType.POSITIVE_ACKNOWLEDGE, ""));
254                             } else {
255                                 // Send NAK
256                                 transmitFirst(new CaddxMessage(CaddxMessageContext.NONE,
257                                         CaddxMessageType.NEGATIVE_ACKNOWLEDGE, ""));
258                             }
259                         }
260                     }
261                 } else {
262                     if (incomingMessage == null) {
263                         logger.trace("CaddxCommunicator.run() Message expected. Nothing received");
264
265                         // Message expected. Nothing received
266                         if (outgoingMessage != null) {
267                             transmitFirst(outgoingMessage); // put message in queue again
268                             continue;
269                         }
270                     } else {
271                         logger.trace("CaddxCommunicator.run() Message expected. Message received");
272
273                         // Message expected. Message received.
274                         int receivedMessageType = incomingMessage.getMessageType();
275                         boolean isMessageExpected = IntStream.of(expectedMessageNumbers)
276                                 .anyMatch(x -> x == receivedMessageType);
277
278                         if (!isMessageExpected) {
279                             logger.trace("Non expected message received exp:{}, recv: {}", expectedMessageNumbers,
280                                     receivedMessageType);
281
282                             // Non expected reply received
283                             if (outgoingMessage != null) {
284                                 transmitFirst(outgoingMessage); // put message in queue again
285                                 skipTransmit = true; // Skip the transmit on the next cycle to receive the panel message
286                             }
287                         }
288                     }
289                 }
290
291                 // Inform the listeners
292                 if (incomingMessage != null) {
293                     if (incomingMessage.isChecksumCorrect()) {
294                         for (CaddxPanelListener listener : listenerQueue) {
295                             if (context != null) {
296                                 incomingMessage.setContext(context);
297                             }
298                             listener.caddxMessage(incomingMessage);
299                         }
300                     } else {
301                         logger.warn(
302                                 "CaddxCommunicator.run() Received packet checksum does not match. in: {} {}, calc {} {}",
303                                 incomingMessage.getChecksum1In(), incomingMessage.getChecksum2In(),
304                                 incomingMessage.getChecksum1Calc(), incomingMessage.getChecksum2Calc());
305                     }
306                 }
307             }
308         } catch (IOException e) {
309             logger.debug("CaddxCommunicator.run() IOException. Stopping sender thread. {}", getSerialPortName());
310             Thread.currentThread().interrupt();
311         }
312
313         logger.warn("CaddxCommunicator.run() Sender thread stopped. {}", getSerialPortName());
314     }
315
316     /**
317      * Event handler to receive the data from the serial port
318      *
319      * @param SerialPortEvent serialPortEvent The event that occurred on the serial port
320      */
321     @Override
322     public void serialEvent(@Nullable SerialPortEvent serialPortEvent) {
323         if (serialPortEvent == null) {
324             return;
325         }
326
327         if (serialPortEvent.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
328             logger.trace("Data receiving from the serial port");
329             if (protocol == CaddxProtocol.Binary) {
330                 receiveInBinaryProtocol(serialPortEvent);
331             } else {
332                 receiveInAsciiProtocol(serialPortEvent);
333             }
334         }
335     }
336
337     private int readByte(InputStream stream) throws IOException {
338         int b = -1;
339
340         if (stream.available() > 0) {
341             b = stream.read();
342         }
343         if (b == -1) {
344             throw new EOFException();
345         }
346         return b;
347     }
348
349     private int readAsciiByte(InputStream stream) throws IOException {
350         if (!haveFirstByte) { // this is the 1st digit
351             int b = readByte(in);
352             tempAsciiByte = (b >= 0x30 && b <= 0x39) ? (b - 0x30) * 0x10 : (b - 0x37) * 0x10;
353             haveFirstByte = true;
354         }
355
356         if (haveFirstByte) { // this is the 2nd digit
357             int b = readByte(in);
358             tempAsciiByte += (b >= 0x30 && b <= 0x39) ? (b - 0x30) : (b - 0x37);
359             haveFirstByte = false;
360         }
361
362         return tempAsciiByte;
363     }
364
365     private void loopUntilByteIsRead(InputStream stream, int byteToRead) throws IOException {
366         int b = 0;
367         do {
368             b = readByte(in);
369         } while (b != byteToRead);
370     }
371
372     private void offerCaddxMessage() throws InterruptedException {
373         logger.trace("Offering received message");
374
375         // Full message received in data byte array
376         CaddxMessage caddxMessage = new CaddxMessage(CaddxMessageContext.NONE, message, true);
377         if (!exchanger.offer(caddxMessage, 3, TimeUnit.SECONDS)) {
378             logger.debug("Offered message was not received");
379         }
380     }
381
382     private void receiveInBinaryProtocol(SerialPortEvent serialPortEvent) {
383         try {
384             // Read the start byte
385             if (!inMessage) // skip until 0x7E
386             {
387                 loopUntilByteIsRead(in, 0x7e);
388
389                 inMessage = true;
390                 messageBufferLength = 0;
391             }
392             logger.trace("CaddxCommunicator.handleBinaryProtocol() Got start byte");
393
394             // Read the message length
395             if (messageBufferLength == 0) {
396                 int b = readByte(in);
397                 messageBufferLength = b + 2; // add two bytes for the checksum
398                 message = new byte[messageBufferLength];
399                 messageBufferIndex = 0;
400             }
401             logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message length {}", messageBufferLength);
402
403             // Read the message
404             do {
405                 int b = readByte(in);
406                 message[messageBufferIndex] = (byte) b;
407
408                 if (message[messageBufferIndex] == 0x7D) {
409                     unStuff = true;
410                     continue;
411                 }
412
413                 if (unStuff) {
414                     message[messageBufferIndex] |= 0x20;
415                     unStuff = false;
416                 }
417
418                 messageBufferIndex++;
419             } while (messageBufferIndex < messageBufferLength);
420
421             // Offer the message
422             offerCaddxMessage();
423
424             logger.trace("CaddxCommunicator.handleBinaryProtocol() Got message {}", message[0]);
425         } catch (EOFException e) {
426             return;
427         } catch (IOException e) {
428         } catch (InterruptedException e) {
429             logger.trace("InterruptedException caught.");
430             Thread.currentThread().interrupt();
431         }
432
433         // Initialize state for a new reception
434         inMessage = false;
435         messageBufferLength = 0;
436         messageBufferIndex = 0;
437         unStuff = false;
438     }
439
440     private void receiveInAsciiProtocol(SerialPortEvent serialPortEvent) {
441         try {
442             // Read the start byte
443             if (!inMessage) {
444                 loopUntilByteIsRead(in, 0x0a);
445
446                 inMessage = true;
447                 haveFirstByte = false;
448                 messageBufferLength = 0;
449             }
450             logger.trace("CaddxCommunicator.handleAsciiProtocol() Got start byte");
451
452             // Read the message length
453             if (messageBufferLength == 0) {
454                 int b = readAsciiByte(in);
455                 messageBufferLength = b + 2; // add 2 bytes for the checksum
456                 message = new byte[messageBufferLength];
457             }
458             logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message length {}", messageBufferLength);
459
460             // Read the message
461             do {
462                 int b = readAsciiByte(in);
463                 message[messageBufferIndex] = (byte) b;
464                 messageBufferIndex++;
465             } while (messageBufferIndex < messageBufferLength);
466
467             // Offer the message
468             offerCaddxMessage();
469
470             logger.trace("CaddxCommunicator.handleAsciiProtocol() Got message {}", message[0]);
471         } catch (EOFException e) {
472             return;
473         } catch (IOException e) {
474         } catch (InterruptedException e) {
475             logger.trace("InterruptedException caught.");
476             Thread.currentThread().interrupt();
477         }
478
479         // Initialize state for a new reception
480         inMessage = false;
481         messageBufferLength = 0;
482         messageBufferIndex = 0;
483     }
484 }