2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.plugwise.internal;
15 import static org.openhab.binding.plugwise.internal.PlugwiseCommunicationContext.*;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.nio.ByteBuffer;
20 import java.util.Iterator;
21 import java.util.TooManyListenersException;
22 import java.util.regex.Matcher;
23 import java.util.regex.Pattern;
25 import org.apache.commons.lang3.StringUtils;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.plugwise.internal.protocol.AcknowledgementMessage;
29 import org.openhab.binding.plugwise.internal.protocol.Message;
30 import org.openhab.binding.plugwise.internal.protocol.MessageFactory;
31 import org.openhab.binding.plugwise.internal.protocol.field.MessageType;
32 import org.openhab.core.io.transport.serial.SerialPort;
33 import org.openhab.core.io.transport.serial.SerialPortEvent;
34 import org.openhab.core.io.transport.serial.SerialPortEventListener;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * Processes messages received from the Plugwise Stick using a serial connection.
41 * @author Wouter Born, Karel Goderis - Initial contribution
44 public class PlugwiseMessageProcessor implements SerialPortEventListener {
46 private class MessageProcessorThread extends Thread {
48 public MessageProcessorThread() {
49 super("OH-binding-" + context.getBridgeUID() + "-message-processor");
55 while (!interrupted()) {
57 Message message = context.getReceivedQueue().take();
58 if (message != null) {
59 logger.debug("Took message from receivedQueue (length={})", context.getReceivedQueue().size());
60 processMessage(message);
62 logger.debug("Skipping null message from receivedQueue (length={})",
63 context.getReceivedQueue().size());
65 } catch (InterruptedException e) {
66 // That's our signal to stop
68 } catch (Exception e) {
69 logger.warn("Error while taking message from receivedQueue", e);
75 /** Matches Plugwise responses into the following groups: protocolHeader command sequence payload CRC */
76 private static final Pattern RESPONSE_PATTERN = Pattern.compile("(.{4})(\\w{4})(\\w{4})(\\w*?)(\\w{4})");
78 private final Logger logger = LoggerFactory.getLogger(PlugwiseMessageProcessor.class);
79 private final PlugwiseCommunicationContext context;
80 private final MessageFactory messageFactory = new MessageFactory();
82 private final ByteBuffer readBuffer = ByteBuffer.allocate(PlugwiseCommunicationContext.MAX_BUFFER_SIZE);
83 private int previousByte = -1;
85 private @Nullable MessageProcessorThread thread;
87 public PlugwiseMessageProcessor(PlugwiseCommunicationContext context) {
88 this.context = context;
92 * Parse a buffer into a Message and put it in the appropriate queue for further processing
94 * @param readBuffer - the string to parse
96 private void parseAndQueue(ByteBuffer readBuffer) {
97 String response = new String(readBuffer.array(), 0, readBuffer.limit());
98 response = StringUtils.chomp(response);
100 Matcher matcher = RESPONSE_PATTERN.matcher(response);
102 if (matcher.matches()) {
103 String protocolHeader = matcher.group(1);
104 String messageTypeHex = matcher.group(2);
105 String sequence = matcher.group(3);
106 String payload = matcher.group(4);
107 String crc = matcher.group(5);
109 if (protocolHeader.equals(PROTOCOL_HEADER)) {
110 String calculatedCRC = Message.getCRC(messageTypeHex + sequence + payload);
111 if (calculatedCRC.equals(crc)) {
112 MessageType messageType = MessageType.forValue(Integer.parseInt(messageTypeHex, 16));
113 int sequenceNumber = Integer.parseInt(sequence, 16);
115 if (messageType == null) {
116 logger.debug("Received unrecognized message: messageTypeHex=0x{}, sequence={}, payload={}",
117 messageTypeHex, sequenceNumber, payload);
121 logger.debug("Received message: messageType={}, sequenceNumber={}, payload={}", messageType,
122 sequenceNumber, payload);
125 Message message = messageFactory.createMessage(messageType, sequenceNumber, payload);
127 if (message instanceof AcknowledgementMessage
128 && !((AcknowledgementMessage) message).isExtended()) {
129 logger.debug("Adding to acknowledgedQueue: {}", message);
130 context.getAcknowledgedQueue().put((AcknowledgementMessage) message);
132 logger.debug("Adding to receivedQueue: {}", message);
133 context.getReceivedQueue().put(message);
135 } catch (IllegalArgumentException e) {
136 logger.warn("Failed to create message", e);
137 } catch (InterruptedException e) {
138 Thread.interrupted();
141 logger.warn("Plugwise protocol CRC error: {} does not match {} in message", calculatedCRC, crc);
144 logger.debug("Plugwise protocol header error: {} in message {}", protocolHeader, response);
146 } else if (!response.contains("APSRequestNodeInfo") && !response.contains("APSSetSleepBehaviour")
147 && !response.startsWith("# ")) {
148 logger.warn("Plugwise protocol message error: {}", response);
152 private void processMessage(Message message) {
153 context.getFilteredListeners().notifyListeners(message);
155 // After processing the response to a message, we remove any reference to the original request
156 // stored in the sentQueue
157 // WARNING: We assume that each request sent out can only be followed bye EXACTLY ONE response - so
158 // far it seems that the Plugwise protocol is operating in that way
161 context.getSentQueueLock().lock();
163 Iterator<@Nullable PlugwiseQueuedMessage> messageIterator = context.getSentQueue().iterator();
164 while (messageIterator.hasNext()) {
165 PlugwiseQueuedMessage queuedSentMessage = messageIterator.next();
166 if (queuedSentMessage != null
167 && queuedSentMessage.getMessage().getSequenceNumber() == message.getSequenceNumber()) {
168 logger.debug("Removing from sentQueue: {}", queuedSentMessage.getMessage());
169 context.getSentQueue().remove(queuedSentMessage);
174 context.getSentQueueLock().unlock();
178 @SuppressWarnings("resource")
180 public void serialEvent(@Nullable SerialPortEvent event) {
181 if (event != null && event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
182 // We get here if data has been received
183 SerialPort serialPort = context.getSerialPort();
184 if (serialPort == null) {
185 logger.debug("Failed to read available data from null serialPort");
190 InputStream inputStream = serialPort.getInputStream();
191 if (inputStream == null) {
192 logger.debug("Failed to read available data from null inputStream");
196 // Read data from serial device
197 while (inputStream.available() > 0) {
198 int currentByte = inputStream.read();
199 // Plugwise sends ASCII data, but for some unknown reason we sometimes get data with unsigned
200 // byte value >127 which in itself is very strange. We filter these out for the time being
201 if (currentByte < 128) {
202 readBuffer.put((byte) currentByte);
203 if (previousByte == CR && currentByte == LF) {
205 parseAndQueue(readBuffer);
209 previousByte = currentByte;
213 } catch (IOException e) {
214 logger.debug("Error receiving data on serial port {}: {}", context.getConfiguration().getSerialPort(),
220 @SuppressWarnings("resource")
221 public void start() throws PlugwiseInitializationException {
222 SerialPort serialPort = context.getSerialPort();
223 if (serialPort == null) {
224 throw new PlugwiseInitializationException("Failed to add serial port listener because port is null");
228 serialPort.addEventListener(this);
229 } catch (TooManyListenersException e) {
230 throw new PlugwiseInitializationException("Failed to add serial port listener", e);
233 thread = new MessageProcessorThread();
237 @SuppressWarnings("resource")
239 PlugwiseUtils.stopBackgroundThread(thread);
241 SerialPort serialPort = context.getSerialPort();
242 if (serialPort != null) {
243 serialPort.removeEventListener();