]> git.basschouten.com Git - openhab-addons.git/blob
8cefcb275355075cadac1dc9c4dd6984f1597b6d
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.plugwise.internal;
14
15 import static org.openhab.binding.plugwise.internal.PlugwiseCommunicationContext.*;
16 import static org.openhab.binding.plugwise.internal.protocol.field.MessageType.NETWORK_STATUS_REQUEST;
17
18 import java.io.IOException;
19 import java.nio.ByteBuffer;
20 import java.nio.channels.Channels;
21 import java.nio.channels.WritableByteChannel;
22 import java.util.concurrent.TimeUnit;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.plugwise.internal.protocol.AcknowledgementMessage;
27 import org.openhab.binding.plugwise.internal.protocol.Message;
28 import org.openhab.core.io.transport.serial.SerialPort;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Sends messages to the Plugwise Stick using a serial connection.
34  *
35  * @author Wouter Born, Karel Goderis - Initial contribution
36  */
37 @NonNullByDefault
38 public class PlugwiseMessageSender {
39
40     private class MessageSenderThread extends Thread {
41
42         private final int messageWaitTime;
43
44         public MessageSenderThread(int messageWaitTime) {
45             super("OH-binding-" + context.getBridgeUID() + "-message-sender");
46             this.messageWaitTime = messageWaitTime;
47             setDaemon(true);
48         }
49
50         @Override
51         public void run() {
52             while (!interrupted()) {
53                 try {
54                     PlugwiseQueuedMessage queuedMessage = context.getSendQueue().take();
55                     logger.debug("Took message from sendQueue (length={})", context.getSendQueue().size());
56                     if (queuedMessage == null) {
57                         continue;
58                     }
59                     sendMessage(queuedMessage);
60                     sleep(messageWaitTime);
61                 } catch (InterruptedException e) {
62                     // That's our signal to stop
63                     break;
64                 } catch (Exception e) {
65                     logger.warn("Error while polling/sending message", e);
66                 }
67             }
68         }
69     }
70
71     /** Default maximum number of attempts to send a message */
72     private static final int MAX_RETRIES = 1;
73
74     /** After exceeding this threshold the Stick is set offline */
75     private static final int MAX_SEQUENTIAL_WRITE_ERRORS = 15;
76
77     private final Logger logger = LoggerFactory.getLogger(PlugwiseMessageSender.class);
78     private final PlugwiseCommunicationContext context;
79
80     private int sequentialWriteErrors;
81
82     private @Nullable WritableByteChannel outputChannel;
83     private @Nullable MessageSenderThread thread;
84
85     public PlugwiseMessageSender(PlugwiseCommunicationContext context) {
86         this.context = context;
87     }
88
89     public void sendMessage(Message message, PlugwiseMessagePriority priority) throws IOException {
90         if (sequentialWriteErrors > MAX_SEQUENTIAL_WRITE_ERRORS) {
91             throw new IOException("Error writing to serial port " + context.getConfiguration().getSerialPort() + " ("
92                     + sequentialWriteErrors + " times)");
93         }
94
95         logger.debug("Adding {} message to sendQueue: {}", priority, message);
96         context.getSendQueue().put(new PlugwiseQueuedMessage(message, priority));
97     }
98
99     private void sendMessage(PlugwiseQueuedMessage queuedMessage) throws InterruptedException {
100         if (queuedMessage.getAttempts() < MAX_RETRIES) {
101             queuedMessage.increaseAttempts();
102
103             Message message = queuedMessage.getMessage();
104             String messageHexString = message.toHexString();
105
106             WritableByteChannel localOutputChannel = outputChannel;
107             if (localOutputChannel == null) {
108                 logger.warn("Error writing '{}' to serial port {}: outputChannel is null", messageHexString,
109                         context.getConfiguration().getSerialPort());
110                 sequentialWriteErrors++;
111                 return;
112             }
113
114             String packetString = PROTOCOL_HEADER + messageHexString + PROTOCOL_TRAILER;
115             ByteBuffer bytebuffer = ByteBuffer.allocate(packetString.length());
116             bytebuffer.put(packetString.getBytes());
117             bytebuffer.rewind();
118
119             try {
120                 logger.debug("Sending: {} as {}", message, messageHexString);
121                 localOutputChannel.write(bytebuffer);
122                 sequentialWriteErrors = 0;
123             } catch (IOException e) {
124                 logger.warn("Error writing '{}' to serial port {}: {}", messageHexString,
125                         context.getConfiguration().getSerialPort(), e.getMessage());
126                 sequentialWriteErrors++;
127                 return;
128             }
129
130             // Poll the acknowledgement message for at most 1 second, normally it is received within 75ms
131             AcknowledgementMessage ack = context.getAcknowledgedQueue().poll(1, TimeUnit.SECONDS);
132             logger.debug("Removing from acknowledgedQueue: {}", ack);
133
134             if (ack == null) {
135                 String logMsg = "Error sending: No ACK received after 1 second: {}";
136                 if (NETWORK_STATUS_REQUEST.equals(message.getType())) {
137                     // Log on debug because the Stick will be set offline anyhow
138                     logger.debug(logMsg, messageHexString);
139                 } else {
140                     logger.warn(logMsg, messageHexString);
141                 }
142             } else if (!ack.isSuccess()) {
143                 if (ack.isError()) {
144                     logger.warn("Error sending: Negative ACK: {}", messageHexString);
145                 }
146             } else {
147                 // Update the sent message with the new sequence number
148                 message.setSequenceNumber(ack.getSequenceNumber());
149
150                 // Place the sent message in the sent queue
151                 logger.debug("Adding to sentQueue: {}", message);
152                 context.getSentQueueLock().lock();
153                 try {
154                     if (context.getSentQueue().size() == PlugwiseCommunicationContext.MAX_BUFFER_SIZE) {
155                         // For some reason Plugwise devices, or the Stick, does not send responses to Requests.
156                         // They clog the sent queue. Let's flush some part of the queue
157                         PlugwiseQueuedMessage someMessage = context.getSentQueue().poll();
158                         logger.debug("Flushing from sentQueue: {}", someMessage);
159                     }
160                     context.getSentQueue().put(queuedMessage);
161                 } finally {
162                     context.getSentQueueLock().unlock();
163                 }
164             }
165         } else {
166             // Max attempts reached. We give up, and to a network reset
167             logger.warn("Giving up on Plugwise message after {} attempts: {}", queuedMessage.getAttempts(),
168                     queuedMessage.getMessage());
169         }
170     }
171
172     @SuppressWarnings("resource")
173     public void start() throws PlugwiseInitializationException {
174         SerialPort serialPort = context.getSerialPort();
175         if (serialPort == null) {
176             throw new PlugwiseInitializationException("Failed to get serial port output stream because port is null");
177         }
178
179         try {
180             outputChannel = Channels.newChannel(serialPort.getOutputStream());
181         } catch (IOException e) {
182             throw new PlugwiseInitializationException("Failed to get serial port output stream", e);
183         }
184
185         sequentialWriteErrors = 0;
186         thread = new MessageSenderThread(context.getConfiguration().getMessageWaitTime());
187         thread.start();
188     }
189
190     public void stop() {
191         PlugwiseUtils.stopBackgroundThread(thread);
192         if (outputChannel != null) {
193             try {
194                 outputChannel.close();
195                 outputChannel = null;
196             } catch (IOException e) {
197                 logger.warn("Failed to close output channel", e);
198             }
199         }
200     }
201 }