2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.plugwise.internal;
15 import static org.openhab.binding.plugwise.internal.PlugwiseCommunicationContext.*;
16 import static org.openhab.binding.plugwise.internal.protocol.field.MessageType.NETWORK_STATUS_REQUEST;
18 import java.io.IOException;
19 import java.nio.ByteBuffer;
20 import java.nio.channels.Channels;
21 import java.nio.channels.WritableByteChannel;
22 import java.util.concurrent.TimeUnit;
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.plugwise.internal.protocol.AcknowledgementMessage;
27 import org.openhab.binding.plugwise.internal.protocol.Message;
28 import org.openhab.core.io.transport.serial.SerialPort;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Sends messages to the Plugwise Stick using a serial connection.
35 * @author Wouter Born, Karel Goderis - Initial contribution
38 public class PlugwiseMessageSender {
40 private class MessageSenderThread extends Thread {
42 private int messageWaitTime;
44 public MessageSenderThread(int messageWaitTime) {
45 super("OH-binding-" + context.getBridgeUID() + "-message-sender");
46 this.messageWaitTime = messageWaitTime;
52 while (!interrupted()) {
54 PlugwiseQueuedMessage queuedMessage = context.getSendQueue().take();
55 logger.debug("Took message from sendQueue (length={})", context.getSendQueue().size());
56 if (queuedMessage == null) {
59 sendMessage(queuedMessage);
60 sleep(messageWaitTime);
61 } catch (InterruptedException e) {
62 // That's our signal to stop
64 } catch (Exception e) {
65 logger.warn("Error while polling/sending message", e);
71 /** Default maximum number of attempts to send a message */
72 private static final int MAX_RETRIES = 1;
74 /** After exceeding this threshold the Stick is set offline */
75 private static final int MAX_SEQUENTIAL_WRITE_ERRORS = 15;
77 private final Logger logger = LoggerFactory.getLogger(PlugwiseMessageSender.class);
78 private final PlugwiseCommunicationContext context;
80 private int sequentialWriteErrors;
82 private @Nullable WritableByteChannel outputChannel;
83 private @Nullable MessageSenderThread thread;
85 public PlugwiseMessageSender(PlugwiseCommunicationContext context) {
86 this.context = context;
89 public void sendMessage(Message message, PlugwiseMessagePriority priority) throws IOException {
90 if (sequentialWriteErrors > MAX_SEQUENTIAL_WRITE_ERRORS) {
91 throw new IOException("Error writing to serial port " + context.getConfiguration().getSerialPort() + " ("
92 + sequentialWriteErrors + " times)");
95 logger.debug("Adding {} message to sendQueue: {}", priority, message);
96 context.getSendQueue().put(new PlugwiseQueuedMessage(message, priority));
99 private void sendMessage(PlugwiseQueuedMessage queuedMessage) throws InterruptedException {
100 if (queuedMessage.getAttempts() < MAX_RETRIES) {
101 queuedMessage.increaseAttempts();
103 Message message = queuedMessage.getMessage();
104 String messageHexString = message.toHexString();
106 WritableByteChannel localOutputChannel = outputChannel;
107 if (localOutputChannel == null) {
108 logger.warn("Error writing '{}' to serial port {}: outputChannel is null", messageHexString,
109 context.getConfiguration().getSerialPort());
110 sequentialWriteErrors++;
114 String packetString = PROTOCOL_HEADER + messageHexString + PROTOCOL_TRAILER;
115 ByteBuffer bytebuffer = ByteBuffer.allocate(packetString.length());
116 bytebuffer.put(packetString.getBytes());
120 logger.debug("Sending: {} as {}", message, messageHexString);
121 localOutputChannel.write(bytebuffer);
122 sequentialWriteErrors = 0;
123 } catch (IOException e) {
124 logger.warn("Error writing '{}' to serial port {}: {}", messageHexString,
125 context.getConfiguration().getSerialPort(), e.getMessage());
126 sequentialWriteErrors++;
130 // Poll the acknowledgement message for at most 1 second, normally it is received within 75ms
131 AcknowledgementMessage ack = context.getAcknowledgedQueue().poll(1, TimeUnit.SECONDS);
132 logger.debug("Removing from acknowledgedQueue: {}", ack);
135 String logMsg = "Error sending: No ACK received after 1 second: {}";
136 if (NETWORK_STATUS_REQUEST.equals(message.getType())) {
137 // Log on debug because the Stick will be set offline anyhow
138 logger.debug(logMsg, messageHexString);
140 logger.warn(logMsg, messageHexString);
142 } else if (!ack.isSuccess()) {
144 logger.warn("Error sending: Negative ACK: {}", messageHexString);
147 // Update the sent message with the new sequence number
148 message.setSequenceNumber(ack.getSequenceNumber());
150 // Place the sent message in the sent queue
151 logger.debug("Adding to sentQueue: {}", message);
152 context.getSentQueueLock().lock();
154 if (context.getSentQueue().size() == PlugwiseCommunicationContext.MAX_BUFFER_SIZE) {
155 // For some reason Plugwise devices, or the Stick, does not send responses to Requests.
156 // They clog the sent queue. Let's flush some part of the queue
157 PlugwiseQueuedMessage someMessage = context.getSentQueue().poll();
158 logger.debug("Flushing from sentQueue: {}", someMessage);
160 context.getSentQueue().put(queuedMessage);
162 context.getSentQueueLock().unlock();
166 // Max attempts reached. We give up, and to a network reset
167 logger.warn("Giving up on Plugwise message after {} attempts: {}", queuedMessage.getAttempts(),
168 queuedMessage.getMessage());
172 @SuppressWarnings("resource")
173 public void start() throws PlugwiseInitializationException {
174 SerialPort serialPort = context.getSerialPort();
175 if (serialPort == null) {
176 throw new PlugwiseInitializationException("Failed to get serial port output stream because port is null");
180 outputChannel = Channels.newChannel(serialPort.getOutputStream());
181 } catch (IOException e) {
182 throw new PlugwiseInitializationException("Failed to get serial port output stream", e);
185 sequentialWriteErrors = 0;
186 thread = new MessageSenderThread(context.getConfiguration().getMessageWaitTime());
191 PlugwiseUtils.stopBackgroundThread(thread);
192 if (outputChannel != null) {
194 outputChannel.close();
195 outputChannel = null;
196 } catch (IOException e) {
197 logger.warn("Failed to close output channel", e);