2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.nibeheatpump.internal.connection;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.InterruptedIOException;
19 import java.io.OutputStream;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.TooManyListenersException;
24 import org.openhab.binding.nibeheatpump.internal.NibeHeatPumpException;
25 import org.openhab.binding.nibeheatpump.internal.config.NibeHeatPumpConfiguration;
26 import org.openhab.binding.nibeheatpump.internal.message.MessageFactory;
27 import org.openhab.binding.nibeheatpump.internal.message.ModbusReadRequestMessage;
28 import org.openhab.binding.nibeheatpump.internal.message.ModbusWriteRequestMessage;
29 import org.openhab.binding.nibeheatpump.internal.message.NibeHeatPumpMessage;
30 import org.openhab.binding.nibeheatpump.internal.protocol.NibeHeatPumpProtocol;
31 import org.openhab.binding.nibeheatpump.internal.protocol.NibeHeatPumpProtocolContext;
32 import org.openhab.binding.nibeheatpump.internal.protocol.NibeHeatPumpProtocolDefaultContext;
33 import org.openhab.core.io.transport.serial.PortInUseException;
34 import org.openhab.core.io.transport.serial.SerialPort;
35 import org.openhab.core.io.transport.serial.SerialPortEvent;
36 import org.openhab.core.io.transport.serial.SerialPortEventListener;
37 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
38 import org.openhab.core.io.transport.serial.SerialPortManager;
39 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
40 import org.openhab.core.util.HexUtils;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * Connector for serial port communication.
47 * @author Pauli Anttila - Initial contribution
49 public class SerialConnector extends NibeHeatPumpBaseConnector {
51 private final Logger logger = LoggerFactory.getLogger(SerialConnector.class);
53 private InputStream in;
54 private OutputStream out;
55 private SerialPort serialPort;
56 private final SerialPortManager serialPortManager;
57 private Thread readerThread;
58 private NibeHeatPumpConfiguration conf;
60 private final List<byte[]> readQueue = new ArrayList<>();
61 private final List<byte[]> writeQueue = new ArrayList<>();
63 public SerialConnector(SerialPortManager serialPortManager) {
64 logger.debug("Nibe heatpump Serial Port message listener created");
65 this.serialPortManager = serialPortManager;
69 public void connect(NibeHeatPumpConfiguration configuration) throws NibeHeatPumpException {
76 SerialPortIdentifier portIdentifier = serialPortManager.getIdentifier(conf.serialPort);
77 if (portIdentifier == null) {
78 throw new NibeHeatPumpException("Connection failed, no such port: " + conf.serialPort);
81 serialPort = portIdentifier.open(this.getClass().getName(), 2000);
82 serialPort.setSerialPortParams(9600, SerialPort.DATABITS_8, SerialPort.STOPBITS_1, SerialPort.PARITY_NONE);
84 serialPort.enableReceiveThreshold(1);
85 serialPort.disableReceiveTimeout();
87 in = serialPort.getInputStream();
88 out = serialPort.getOutputStream();
91 if (in.markSupported()) {
94 } catch (PortInUseException e) {
95 throw new NibeHeatPumpException("Connection failed, port in use: " + conf.serialPort, e);
96 } catch (UnsupportedCommOperationException | IOException e) {
97 throw new NibeHeatPumpException("Connection failed, reason: " + e.getMessage(), e);
103 readerThread = new SerialReader(in);
104 readerThread.start();
109 public void disconnect() {
110 logger.debug("Disconnecting");
111 if (serialPort != null) {
112 serialPort.removeEventListener();
114 if (readerThread != null) {
115 logger.debug("Interrupt serial listener");
116 readerThread.interrupt();
119 logger.debug("Close serial out stream");
122 } catch (IOException e) {
123 logger.debug("Error while closing the output stream: {}", e.getMessage());
127 logger.debug("Close serial in stream");
130 } catch (IOException e) {
131 logger.debug("Error while closing the input stream: {}", e.getMessage());
134 if (serialPort != null) {
135 logger.debug("Close serial port");
143 logger.debug("Closed");
147 public void sendDatagram(NibeHeatPumpMessage msg) {
148 if (logger.isTraceEnabled()) {
149 logger.trace("Add request to queue: {}", msg.toHexString());
152 if (msg instanceof ModbusWriteRequestMessage) {
153 writeQueue.add(msg.decodeMessage());
154 } else if (msg instanceof ModbusReadRequestMessage) {
155 readQueue.add(msg.decodeMessage());
157 logger.trace("Ignore PDU: {}", msg.getClass());
160 logger.trace("Read queue: {}, Write queue: {}", readQueue.size(), writeQueue.size());
163 public class SerialReader extends Thread implements SerialPortEventListener {
164 boolean interrupted = false;
165 final InputStream in;
167 SerialReader(InputStream in) {
172 public void interrupt() {
179 logger.debug("Data listener started");
181 // RXTX serial port library causes high CPU load
182 // Start event listener, which will just sleep and slow down event loop
184 serialPort.addEventListener(this);
185 serialPort.notifyOnDataAvailable(true);
186 } catch (TooManyListenersException e) {
187 logger.info("RXTX high CPU load workaround failed, reason {}", e.getMessage());
190 NibeHeatPumpProtocolContext context = new NibeHeatPumpProtocolDefaultContext() {
192 public void sendAck() {
194 byte addr = msg().get(NibeHeatPumpProtocol.RES_OFFS_ADR);
196 } catch (IOException e) {
197 sendErrorToListeners(e.getMessage());
202 public void sendNak() {
207 public void msgReceived(byte[] data) {
208 sendMsgToListeners(data);
212 public void sendWriteMsg() {
214 if (!writeQueue.isEmpty()) {
215 sendDataToNibe(writeQueue.remove(0));
217 // no messages to send, send ack to pump
218 byte addr = msg().get(NibeHeatPumpProtocol.RES_OFFS_ADR);
221 } catch (IOException e) {
222 sendErrorToListeners(e.getMessage());
227 public void sendReadMsg() {
229 if (!readQueue.isEmpty()) {
230 sendDataToNibe(readQueue.remove(0));
232 // no messages to send, send ack to pump
233 byte addr = msg().get(NibeHeatPumpProtocol.RES_OFFS_ADR);
236 } catch (IOException e) {
237 sendErrorToListeners(e.getMessage());
242 while (!interrupted) {
244 final byte[] data = getAllAvailableBytes(in);
246 context.buffer().put(data);
248 // flip buffer for reading
249 context.buffer().flip();
251 } catch (InterruptedIOException e) {
252 Thread.currentThread().interrupt();
253 logger.debug("Interrupted via InterruptedIOException");
254 } catch (IOException e) {
255 logger.error("Reading from serial port failed", e);
256 sendErrorToListeners(e.getMessage());
257 } catch (Exception e) {
258 logger.debug("Error occurred during serial port read", e);
261 // run state machine to process all received data
262 while (context.state().process(context)) {
268 // all bytes should be handled, clear buffer for next round
269 context.buffer().clear();
272 logger.debug("Data listener stopped");
275 private byte[] getAllAvailableBytes(InputStream in) throws IOException {
276 ByteArrayOutputStream os = new ByteArrayOutputStream();
279 // wait first byte (blocking)
280 if ((b = in.read()) > -1) {
281 byte[] d = new byte[] { (byte) b };
284 // read rest of the available bytes
285 final int bufferLen = 100;
286 byte[] buffer = new byte[bufferLen];
287 int available = in.available();
289 int len = in.read(buffer, 0, bufferLen);
291 os.write(buffer, 0, len);
296 return os.toByteArray();
303 public void serialEvent(SerialPortEvent event) {
306 * See more details from
307 * https://github.com/NeuronRobotics/nrjavaserial/issues/22
309 logger.trace("RXTX library CPU load workaround, sleep forever");
310 sleep(Long.MAX_VALUE);
311 } catch (InterruptedException e) {
316 @SuppressWarnings("unused")
317 private void sendNakToNibe() throws IOException {
318 logger.trace("Send data (len=1): 15");
323 private void sendAckToNibe(byte address) throws IOException {
324 boolean sendack = false;
326 if (address == NibeHeatPumpProtocol.ADR_MODBUS40 && conf.sendAckToMODBUS40) {
327 logger.debug("Send ack to MODBUS40 message");
329 } else if (address == NibeHeatPumpProtocol.ADR_SMS40 && conf.sendAckToSMS40) {
330 logger.debug("Send ack to SMS40 message");
332 } else if (address == NibeHeatPumpProtocol.ADR_RMU40 && conf.sendAckToRMU40) {
333 logger.debug("Send ack to RMU40 message");
342 private void sendAckToNibe() throws IOException {
343 logger.trace("Send data (len=1): 06");
348 private void sendDataToNibe(byte[] data) throws IOException {
349 if (logger.isTraceEnabled()) {
351 NibeHeatPumpMessage msg = MessageFactory.getMessage(data);
352 logger.trace("Sending msg: {}", msg);
353 } catch (NibeHeatPumpException e) {
356 logger.trace("Sending data (len={}): {}", data.length, HexUtils.bytesToHex(data));