]> git.basschouten.com Git - openhab-addons.git/blob
614d2a27cb895393c99df5df6d267806192da161
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.nibeheatpump.internal.connection;
14
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.InterruptedIOException;
19 import java.io.OutputStream;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.TooManyListenersException;
23
24 import org.openhab.binding.nibeheatpump.internal.NibeHeatPumpException;
25 import org.openhab.binding.nibeheatpump.internal.config.NibeHeatPumpConfiguration;
26 import org.openhab.binding.nibeheatpump.internal.message.MessageFactory;
27 import org.openhab.binding.nibeheatpump.internal.message.ModbusReadRequestMessage;
28 import org.openhab.binding.nibeheatpump.internal.message.ModbusWriteRequestMessage;
29 import org.openhab.binding.nibeheatpump.internal.message.NibeHeatPumpMessage;
30 import org.openhab.binding.nibeheatpump.internal.protocol.NibeHeatPumpProtocol;
31 import org.openhab.binding.nibeheatpump.internal.protocol.NibeHeatPumpProtocolContext;
32 import org.openhab.binding.nibeheatpump.internal.protocol.NibeHeatPumpProtocolDefaultContext;
33 import org.openhab.core.io.transport.serial.PortInUseException;
34 import org.openhab.core.io.transport.serial.SerialPort;
35 import org.openhab.core.io.transport.serial.SerialPortEvent;
36 import org.openhab.core.io.transport.serial.SerialPortEventListener;
37 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
38 import org.openhab.core.io.transport.serial.SerialPortManager;
39 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
40 import org.openhab.core.util.HexUtils;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Connector for serial port communication.
46  *
47  * @author Pauli Anttila - Initial contribution
48  */
49 public class SerialConnector extends NibeHeatPumpBaseConnector {
50
51     private final Logger logger = LoggerFactory.getLogger(SerialConnector.class);
52
53     private InputStream in;
54     private OutputStream out;
55     private SerialPort serialPort;
56     private final SerialPortManager serialPortManager;
57     private Thread readerThread;
58     private NibeHeatPumpConfiguration conf;
59
60     private final List<byte[]> readQueue = new ArrayList<>();
61     private final List<byte[]> writeQueue = new ArrayList<>();
62
63     public SerialConnector(SerialPortManager serialPortManager) {
64         logger.debug("Nibe heatpump Serial Port message listener created");
65         this.serialPortManager = serialPortManager;
66     }
67
68     @Override
69     public void connect(NibeHeatPumpConfiguration configuration) throws NibeHeatPumpException {
70         if (isConnected()) {
71             return;
72         }
73
74         conf = configuration;
75         try {
76             SerialPortIdentifier portIdentifier = serialPortManager.getIdentifier(conf.serialPort);
77             if (portIdentifier == null) {
78                 throw new NibeHeatPumpException("Connection failed, no such port: " + conf.serialPort);
79             }
80
81             serialPort = portIdentifier.open(this.getClass().getName(), 2000);
82             serialPort.setSerialPortParams(9600, SerialPort.DATABITS_8, SerialPort.STOPBITS_1, SerialPort.PARITY_NONE);
83
84             serialPort.enableReceiveThreshold(1);
85             serialPort.disableReceiveTimeout();
86
87             in = serialPort.getInputStream();
88             out = serialPort.getOutputStream();
89
90             out.flush();
91             if (in.markSupported()) {
92                 in.reset();
93             }
94         } catch (PortInUseException e) {
95             throw new NibeHeatPumpException("Connection failed, port in use: " + conf.serialPort, e);
96         } catch (UnsupportedCommOperationException | IOException e) {
97             throw new NibeHeatPumpException("Connection failed, reason: " + e.getMessage(), e);
98         }
99
100         readQueue.clear();
101         writeQueue.clear();
102
103         readerThread = new SerialReader(in);
104         readerThread.start();
105         connected = true;
106     }
107
108     @Override
109     public void disconnect() {
110         logger.debug("Disconnecting");
111         if (serialPort != null) {
112             serialPort.removeEventListener();
113         }
114         if (readerThread != null) {
115             logger.debug("Interrupt serial listener");
116             readerThread.interrupt();
117         }
118         if (out != null) {
119             logger.debug("Close serial out stream");
120             try {
121                 out.close();
122             } catch (IOException e) {
123                 logger.debug("Error while closing the output stream: {}", e.getMessage());
124             }
125         }
126         if (in != null) {
127             logger.debug("Close serial in stream");
128             try {
129                 in.close();
130             } catch (IOException e) {
131                 logger.debug("Error while closing the input stream: {}", e.getMessage());
132             }
133         }
134         if (serialPort != null) {
135             logger.debug("Close serial port");
136             serialPort.close();
137         }
138         readerThread = null;
139         serialPort = null;
140         out = null;
141         in = null;
142         connected = false;
143         logger.debug("Closed");
144     }
145
146     @Override
147     public void sendDatagram(NibeHeatPumpMessage msg) {
148         if (logger.isTraceEnabled()) {
149             logger.trace("Add request to queue: {}", msg.toHexString());
150         }
151
152         if (msg instanceof ModbusWriteRequestMessage) {
153             writeQueue.add(msg.decodeMessage());
154         } else if (msg instanceof ModbusReadRequestMessage) {
155             readQueue.add(msg.decodeMessage());
156         } else {
157             logger.trace("Ignore PDU: {}", msg.getClass());
158         }
159
160         logger.trace("Read queue: {}, Write queue: {}", readQueue.size(), writeQueue.size());
161     }
162
163     public class SerialReader extends Thread implements SerialPortEventListener {
164         boolean interrupted = false;
165         final InputStream in;
166
167         SerialReader(InputStream in) {
168             this.in = in;
169         }
170
171         @Override
172         public void interrupt() {
173             interrupted = true;
174             super.interrupt();
175         }
176
177         @Override
178         public void run() {
179             logger.debug("Data listener started");
180
181             // RXTX serial port library causes high CPU load
182             // Start event listener, which will just sleep and slow down event loop
183             try {
184                 serialPort.addEventListener(this);
185                 serialPort.notifyOnDataAvailable(true);
186             } catch (TooManyListenersException e) {
187                 logger.info("RXTX high CPU load workaround failed, reason {}", e.getMessage());
188             }
189
190             NibeHeatPumpProtocolContext context = new NibeHeatPumpProtocolDefaultContext() {
191                 @Override
192                 public void sendAck() {
193                     try {
194                         byte addr = msg().get(NibeHeatPumpProtocol.RES_OFFS_ADR);
195                         sendAckToNibe(addr);
196                     } catch (IOException e) {
197                         sendErrorToListeners(e.getMessage());
198                     }
199                 }
200
201                 @Override
202                 public void sendNak() {
203                     // do nothing
204                 }
205
206                 @Override
207                 public void msgReceived(byte[] data) {
208                     sendMsgToListeners(data);
209                 }
210
211                 @Override
212                 public void sendWriteMsg() {
213                     try {
214                         if (!writeQueue.isEmpty()) {
215                             sendDataToNibe(writeQueue.remove(0));
216                         } else {
217                             // no messages to send, send ack to pump
218                             byte addr = msg().get(NibeHeatPumpProtocol.RES_OFFS_ADR);
219                             sendAckToNibe(addr);
220                         }
221                     } catch (IOException e) {
222                         sendErrorToListeners(e.getMessage());
223                     }
224                 }
225
226                 @Override
227                 public void sendReadMsg() {
228                     try {
229                         if (!readQueue.isEmpty()) {
230                             sendDataToNibe(readQueue.remove(0));
231                         } else {
232                             // no messages to send, send ack to pump
233                             byte addr = msg().get(NibeHeatPumpProtocol.RES_OFFS_ADR);
234                             sendAckToNibe(addr);
235                         }
236                     } catch (IOException e) {
237                         sendErrorToListeners(e.getMessage());
238                     }
239                 }
240             };
241
242             while (!interrupted) {
243                 try {
244                     final byte[] data = getAllAvailableBytes(in);
245                     if (data != null) {
246                         context.buffer().put(data);
247
248                         // flip buffer for reading
249                         context.buffer().flip();
250                     }
251                 } catch (InterruptedIOException e) {
252                     Thread.currentThread().interrupt();
253                     logger.debug("Interrupted via InterruptedIOException");
254                 } catch (IOException e) {
255                     logger.error("Reading from serial port failed", e);
256                     sendErrorToListeners(e.getMessage());
257                 } catch (Exception e) {
258                     logger.debug("Error occurred during serial port read", e);
259                 }
260
261                 // run state machine to process all received data
262                 while (context.state().process(context)) {
263                     if (interrupted) {
264                         break;
265                     }
266                 }
267
268                 // all bytes should be handled, clear buffer for next round
269                 context.buffer().clear();
270             }
271
272             logger.debug("Data listener stopped");
273         }
274
275         private byte[] getAllAvailableBytes(InputStream in) throws IOException {
276             ByteArrayOutputStream os = new ByteArrayOutputStream();
277
278             int b;
279             // wait first byte (blocking)
280             if ((b = in.read()) > -1) {
281                 byte[] d = new byte[] { (byte) b };
282                 os.write(d);
283
284                 // read rest of the available bytes
285                 final int bufferLen = 100;
286                 byte[] buffer = new byte[bufferLen];
287                 int available = in.available();
288                 if (available > 0) {
289                     int len = in.read(buffer, 0, bufferLen);
290                     if (len > -1) {
291                         os.write(buffer, 0, len);
292                     }
293                 }
294
295                 os.flush();
296                 return os.toByteArray();
297             }
298
299             return null;
300         }
301
302         @Override
303         public void serialEvent(SerialPortEvent event) {
304             try {
305                 /*
306                  * See more details from
307                  * https://github.com/NeuronRobotics/nrjavaserial/issues/22
308                  */
309                 logger.trace("RXTX library CPU load workaround, sleep forever");
310                 sleep(Long.MAX_VALUE);
311             } catch (InterruptedException e) {
312             }
313         }
314     }
315
316     @SuppressWarnings("unused")
317     private void sendNakToNibe() throws IOException {
318         logger.trace("Send data (len=1): 15");
319         out.write(0x15);
320         out.flush();
321     }
322
323     private void sendAckToNibe(byte address) throws IOException {
324         boolean sendack = false;
325
326         if (address == NibeHeatPumpProtocol.ADR_MODBUS40 && conf.sendAckToMODBUS40) {
327             logger.debug("Send ack to MODBUS40 message");
328             sendack = true;
329         } else if (address == NibeHeatPumpProtocol.ADR_SMS40 && conf.sendAckToSMS40) {
330             logger.debug("Send ack to SMS40 message");
331             sendack = true;
332         } else if (address == NibeHeatPumpProtocol.ADR_RMU40 && conf.sendAckToRMU40) {
333             logger.debug("Send ack to RMU40 message");
334             sendack = true;
335         }
336
337         if (sendack) {
338             sendAckToNibe();
339         }
340     }
341
342     private void sendAckToNibe() throws IOException {
343         logger.trace("Send data (len=1): 06");
344         out.write(0x06);
345         out.flush();
346     }
347
348     private void sendDataToNibe(byte[] data) throws IOException {
349         if (logger.isTraceEnabled()) {
350             try {
351                 NibeHeatPumpMessage msg = MessageFactory.getMessage(data);
352                 logger.trace("Sending msg: {}", msg);
353             } catch (NibeHeatPumpException e) {
354                 // do nothing
355             }
356             logger.trace("Sending data (len={}): {}", data.length, HexUtils.bytesToHex(data));
357         }
358         out.write(data);
359         out.flush();
360     }
361 }