2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.openthermgateway.internal;
15 import static org.openhab.binding.openthermgateway.internal.OpenThermGatewayBindingConstants.BINDING_ID;
17 import java.io.BufferedReader;
18 import java.io.IOException;
19 import java.io.InputStreamReader;
20 import java.io.PrintWriter;
21 import java.net.InetSocketAddress;
22 import java.net.Socket;
23 import java.time.Instant;
24 import java.util.ArrayDeque;
25 import java.util.Queue;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
33 import org.eclipse.jdt.annotation.NonNullByDefault;
34 import org.eclipse.jdt.annotation.Nullable;
35 import org.openhab.core.common.NamedThreadFactory;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * The {@link OpenThermGatewaySocketConnector} is responsible for handling the socket connection
42 * @author Arjen Korevaar - Initial contribution
43 * @author Arjan Mels - Improved robustness by re-sending commands, handling all message types (not only Boiler)
44 * @author Andrew Fiddian-Green - Improve thread interruption, socket timeouts, exception handling, FIFO command queue
47 public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnector {
48 private static final int COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS = 100;
49 private static final int COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS = 5000;
50 private static final int MAXIMUM_FIFO_BUFFER_SIZE = 20;
52 private static final String WDT_RESET_RESPONSE_MESSAGE = "WDT reset";
54 private final Logger logger = LoggerFactory.getLogger(OpenThermGatewaySocketConnector.class);
56 private final OpenThermGatewayCallback callback;
57 private final String ipaddress;
58 private final int port;
59 private final int connectTimeoutMilliseconds;
60 private final int readTimeoutMilliSeconds;
62 private @Nullable volatile PrintWriter writer;
63 private @Nullable volatile Thread thread;
64 private @Nullable Future<Boolean> future;
65 private @Nullable ExecutorService executor;
68 * FIFO queue of commands that are pending being sent to the gateway. That is commands that are either not yet sent,
69 * or sent but not yet acknowledged and pending possible re-sending.
71 * Note: we must use 'synchronized' when accessing this object to ensure proper thread safety.
73 private final Queue<PendingCommand> pendingCommands = new ArrayDeque<>(MAXIMUM_FIFO_BUFFER_SIZE);
76 * Wrapper for a command entry in the pending command FIFO queue.
78 * @author AndrewFG - initial contribution
80 private class PendingCommand {
81 protected final GatewayCommand command;
82 protected final Instant expiryTime = Instant.now().plusMillis(COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS);
83 protected Instant sentTime = Instant.MIN;
85 protected PendingCommand(GatewayCommand command) {
86 this.command = command;
90 * Check if the command has been sent to the gateway.
92 * @return true if it has been sent
94 protected boolean sent() {
95 return Instant.MIN.isBefore(sentTime);
99 * Check if the command is ready to send (or re-send) to the gateway.
101 * @return true if the command has either not been sent, or sent but not acknowledged within due time i.e. it
102 * needs to be re-sent
104 protected boolean readyToSend() {
105 return sentTime.isBefore(Instant.now().minusMillis(COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS));
109 * Check if the command has expired.
111 * @return true if the expiry time has expired
113 protected boolean expired() {
114 return Instant.now().isAfter(expiryTime);
118 public OpenThermGatewaySocketConnector(OpenThermGatewayCallback callback, OpenThermGatewayConfiguration config) {
119 this.callback = callback;
120 ipaddress = config.ipaddress;
122 connectTimeoutMilliseconds = config.connectTimeoutSeconds * 1000;
123 readTimeoutMilliSeconds = config.readTimeoutSeconds * 1000;
127 public Boolean call() throws Exception {
128 thread = Thread.currentThread();
129 try (Socket socket = new Socket()) {
130 logger.debug("Connecting OpenThermGatewaySocketConnector to {}:{}", this.ipaddress, this.port);
131 callback.connectionStateChanged(ConnectionState.CONNECTING);
133 socket.connect(new InetSocketAddress(ipaddress, port), connectTimeoutMilliseconds);
134 socket.setSoTimeout(readTimeoutMilliSeconds);
136 logger.debug("OpenThermGatewaySocketConnector connected");
137 callback.connectionStateChanged(ConnectionState.CONNECTED);
139 try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
140 PrintWriter wrt = new PrintWriter(socket.getOutputStream(), true)) {
141 // Make writer accessible on class level
144 sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTREPORT, "A"));
145 // Set the OTGW to report every message it receives and transmits
146 sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTSUMMARY, "0"));
148 while (!Thread.currentThread().isInterrupted()) {
150 String message = reader.readLine();
152 if (message != null) {
153 logger.trace("Read: {}", message);
154 handleMessage(message);
156 logger.debug("Received NULL message from OpenTherm Gateway (EOF)");
160 // disable reporting every message (for cleaner re-starting)
161 sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTSUMMARY, "1"));
162 } catch (IOException ex) {
163 logger.warn("Error communicating with OpenTherm Gateway: '{}'", ex.getMessage());
165 } catch (IOException ex) {
166 logger.warn("Unable to connect to the OpenTherm Gateway: '{}'", ex.getMessage());
170 logger.debug("OpenThermGatewaySocketConnector disconnected");
171 callback.connectionStateChanged(ConnectionState.DISCONNECTED);
177 logger.debug("Stopping OpenThermGatewaySocketConnector");
179 Thread thread = this.thread;
180 Future<Boolean> future = this.future;
181 ExecutorService executor = this.executor;
183 if (executor != null) {
186 if ((thread != null) && thread.isAlive()) {
189 if (future != null) {
191 future.get(readTimeoutMilliSeconds, TimeUnit.MILLISECONDS);
192 } catch (ExecutionException e) {
193 // expected exception due to e.g. IOException on socket close
194 } catch (TimeoutException | InterruptedException e) {
195 // unexpected exception
196 logger.warn("stop() exception '{}' => PLEASE REPORT !!", e.getMessage());
202 this.executor = null;
206 public void start() {
207 logger.debug("Starting OpenThermGatewaySocketConnector");
208 ExecutorService executor = this.executor = Executors
209 .newSingleThreadExecutor(new NamedThreadFactory("binding-" + BINDING_ID));
210 future = executor.submit(this);
214 public synchronized boolean isConnected() {
215 Thread thread = this.thread;
216 return (thread != null) && thread.isAlive();
220 public void sendCommand(GatewayCommand command) {
221 synchronized (pendingCommands) {
222 // append the command to the end of the FIFO queue
223 if (MAXIMUM_FIFO_BUFFER_SIZE < pendingCommands.size()
224 || !pendingCommands.offer(new PendingCommand(command))) {
225 logger.warn("Command refused: FIFO buffer overrun => PLEASE REPORT !!");
227 // send the FIFO head command, which may or may not be the one just added
228 pendingCommandsSendHeadCommandIfReady();
229 } // release the pendingCommands lock
233 * Process the incoming message. Remove any expired commands from the queue. Check if the incoming message is an
234 * acknowledgement. If it is the acknowledgement for the FIFO head command, remove it from the queue. Try to send
235 * the (next) FIFO head command, if it exists, and is ready to send. And finally if the message is not an
236 * acknowledgement, check if it is a valid message, and if so, pass it to the gateway Thing handler for processing.
238 * @param message the incoming message received from the gateway
240 private void handleMessage(String message) {
241 // check if the message is a command acknowledgement e.g. having the form "XX: yyy"
242 boolean isCommandAcknowledgement = (message.length() > 2) && (message.charAt(2) == ':');
244 synchronized (pendingCommands) {
245 // remove all expired commands
246 pendingCommandsRemoveAllExpiredCommands();
248 // if acknowledgement is for the FIFO head command, remove it from the queue
249 if (isCommandAcknowledgement) {
250 pendingCommandsRemoveHeadCommandIfAcknowledgement(message);
253 // (re-)send the FIFO head command, if it exists and is ready to send
254 pendingCommandsSendHeadCommandIfReady();
255 } // release the pendingCommands lock
257 if (isCommandAcknowledgement) {
258 callback.receiveAcknowledgement(message);
259 } else if (message.startsWith(WDT_RESET_RESPONSE_MESSAGE)) {
260 logger.warn("OpenTherm Gateway was reset by its Watch-Dog Timer!");
262 Message msg = Message.parse(message);
264 // ignore and log bad messages
266 logger.debug("Received message: {}, (unknown)", message);
270 // pass good messages to the Thing handler for processing
271 if (msg.getMessageType() == MessageType.READACK || msg.getMessageType() == MessageType.WRITEDATA
272 || msg.getID() == 0 || msg.getID() == 1) {
273 callback.receiveMessage(msg);
279 * If there is a FIFO head command that is ready to (re-)send, then send it.
281 private void pendingCommandsSendHeadCommandIfReady() {
282 // process the command at the head of the queue
283 PendingCommand headCommand = pendingCommands.peek();
284 if (headCommand != null && headCommand.readyToSend()) {
285 String message = headCommand.command.toFullString();
287 // transmit the command string
288 PrintWriter writer = this.writer;
289 if (isConnected() && (writer != null)) {
290 writer.print(message + "\r\n");
292 if (writer.checkError()) {
293 logger.warn("Error sending command to OpenTherm Gateway => PLEASE REPORT !!");
296 if (logger.isTraceEnabled()) {
297 logger.trace("Sent: {}{}", message, headCommand.sent() ? " (repeat)" : "");
299 headCommand.sentTime = Instant.now();
301 logger.debug("Unable to send command: {}. OpenThermGatewaySocketConnector is not connected.", message);
307 * If the acknowledgement message corresponds to the FIFO head command then remove it from the queue.
309 * @param message must be an acknowledgement message in the form "XX: yyy"
311 private void pendingCommandsRemoveHeadCommandIfAcknowledgement(String message) {
312 PendingCommand headCommand = pendingCommands.peek();
313 if (headCommand != null && headCommand.command.getCode().equals(message.substring(0, 2))) {
314 pendingCommands.poll();
319 * Remove all expired commands from the queue.
321 private void pendingCommandsRemoveAllExpiredCommands() {
322 pendingCommands.removeIf(pendingCommand -> pendingCommand.expired());