]> git.basschouten.com Git - openhab-addons.git/blob
b138c40f4982d8db58d58fea7067b74e93b12d64
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.openthermgateway.internal;
14
15 import static org.openhab.binding.openthermgateway.internal.OpenThermGatewayBindingConstants.BINDING_ID;
16
17 import java.io.BufferedReader;
18 import java.io.IOException;
19 import java.io.InputStreamReader;
20 import java.io.PrintWriter;
21 import java.net.InetSocketAddress;
22 import java.net.Socket;
23 import java.time.Instant;
24 import java.util.ArrayDeque;
25 import java.util.Queue;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32
33 import org.eclipse.jdt.annotation.NonNullByDefault;
34 import org.eclipse.jdt.annotation.Nullable;
35 import org.openhab.core.common.NamedThreadFactory;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * The {@link OpenThermGatewaySocketConnector} is responsible for handling the socket connection
41  *
42  * @author Arjen Korevaar - Initial contribution
43  * @author Arjan Mels - Improved robustness by re-sending commands, handling all message types (not only Boiler)
44  * @author Andrew Fiddian-Green - Improve thread interruption, socket timeouts, exception handling, FIFO command queue
45  */
46 @NonNullByDefault
47 public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnector {
48     private static final int COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS = 100;
49     private static final int COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS = 5000;
50     private static final int MAXIMUM_FIFO_BUFFER_SIZE = 20;
51
52     private static final String WDT_RESET_RESPONSE_MESSAGE = "WDT reset";
53
54     private final Logger logger = LoggerFactory.getLogger(OpenThermGatewaySocketConnector.class);
55
56     private final OpenThermGatewayCallback callback;
57     private final String ipaddress;
58     private final int port;
59     private final int connectTimeoutMilliseconds;
60     private final int readTimeoutMilliSeconds;
61
62     private @Nullable volatile PrintWriter writer;
63     private @Nullable volatile Thread thread;
64     private @Nullable Future<Boolean> future;
65     private @Nullable ExecutorService executor;
66
67     /**
68      * FIFO queue of commands that are pending being sent to the gateway. That is commands that are either not yet sent,
69      * or sent but not yet acknowledged and pending possible re-sending.
70      *
71      * Note: we must use 'synchronized' when accessing this object to ensure proper thread safety.
72      */
73     private final Queue<PendingCommand> pendingCommands = new ArrayDeque<>(MAXIMUM_FIFO_BUFFER_SIZE);
74
75     /**
76      * Wrapper for a command entry in the pending command FIFO queue.
77      *
78      * @author AndrewFG - initial contribution
79      */
80     private class PendingCommand {
81         protected final GatewayCommand command;
82         protected final Instant expiryTime = Instant.now().plusMillis(COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS);
83         protected Instant sentTime = Instant.MIN;
84
85         protected PendingCommand(GatewayCommand command) {
86             this.command = command;
87         }
88
89         /**
90          * Check if the command has been sent to the gateway.
91          *
92          * @return true if it has been sent
93          */
94         protected boolean sent() {
95             return Instant.MIN.isBefore(sentTime);
96         }
97
98         /**
99          * Check if the command is ready to send (or re-send) to the gateway.
100          *
101          * @return true if the command has either not been sent, or sent but not acknowledged within due time i.e. it
102          *         needs to be re-sent
103          */
104         protected boolean readyToSend() {
105             return sentTime.isBefore(Instant.now().minusMillis(COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS));
106         }
107
108         /**
109          * Check if the command has expired.
110          *
111          * @return true if the expiry time has expired
112          */
113         protected boolean expired() {
114             return Instant.now().isAfter(expiryTime);
115         }
116     }
117
118     public OpenThermGatewaySocketConnector(OpenThermGatewayCallback callback, OpenThermGatewayConfiguration config) {
119         this.callback = callback;
120         ipaddress = config.ipaddress;
121         port = config.port;
122         connectTimeoutMilliseconds = config.connectTimeoutSeconds * 1000;
123         readTimeoutMilliSeconds = config.readTimeoutSeconds * 1000;
124     }
125
126     @Override
127     public Boolean call() throws Exception {
128         thread = Thread.currentThread();
129         try (Socket socket = new Socket()) {
130             logger.debug("Connecting OpenThermGatewaySocketConnector to {}:{}", this.ipaddress, this.port);
131             callback.connectionStateChanged(ConnectionState.CONNECTING);
132
133             socket.connect(new InetSocketAddress(ipaddress, port), connectTimeoutMilliseconds);
134             socket.setSoTimeout(readTimeoutMilliSeconds);
135
136             logger.debug("OpenThermGatewaySocketConnector connected");
137             callback.connectionStateChanged(ConnectionState.CONNECTED);
138
139             try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
140                     PrintWriter wrt = new PrintWriter(socket.getOutputStream(), true)) {
141                 // Make writer accessible on class level
142                 writer = wrt;
143
144                 sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTREPORT, "A"));
145                 // Set the OTGW to report every message it receives and transmits
146                 sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTSUMMARY, "0"));
147
148                 while (!Thread.currentThread().isInterrupted()) {
149                     @Nullable
150                     String message = reader.readLine();
151
152                     if (message != null) {
153                         logger.trace("Read: {}", message);
154                         handleMessage(message);
155                     } else {
156                         logger.debug("Received NULL message from OpenTherm Gateway (EOF)");
157                         break;
158                     }
159                 }
160                 // disable reporting every message (for cleaner re-starting)
161                 sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTSUMMARY, "1"));
162             } catch (IOException ex) {
163                 logger.warn("Error communicating with OpenTherm Gateway: '{}'", ex.getMessage());
164             }
165         } catch (IOException ex) {
166             logger.warn("Unable to connect to the OpenTherm Gateway: '{}'", ex.getMessage());
167         }
168         thread = null;
169         writer = null;
170         logger.debug("OpenThermGatewaySocketConnector disconnected");
171         callback.connectionStateChanged(ConnectionState.DISCONNECTED);
172         return true;
173     }
174
175     @Override
176     public void stop() {
177         logger.debug("Stopping OpenThermGatewaySocketConnector");
178
179         Thread thread = this.thread;
180         Future<Boolean> future = this.future;
181         ExecutorService executor = this.executor;
182
183         if (executor != null) {
184             executor.shutdown();
185         }
186         if ((thread != null) && thread.isAlive()) {
187             thread.interrupt();
188         }
189         if (future != null) {
190             try {
191                 future.get(readTimeoutMilliSeconds, TimeUnit.MILLISECONDS);
192             } catch (ExecutionException e) {
193                 // expected exception due to e.g. IOException on socket close
194             } catch (TimeoutException | InterruptedException e) {
195                 // unexpected exception
196                 logger.warn("stop() exception '{}' => PLEASE REPORT !!", e.getMessage());
197             }
198         }
199
200         this.thread = null;
201         this.future = null;
202         this.executor = null;
203     }
204
205     @Override
206     public void start() {
207         logger.debug("Starting OpenThermGatewaySocketConnector");
208         ExecutorService executor = this.executor = Executors
209                 .newSingleThreadExecutor(new NamedThreadFactory("binding-" + BINDING_ID));
210         future = executor.submit(this);
211     }
212
213     @Override
214     public synchronized boolean isConnected() {
215         Thread thread = this.thread;
216         return (thread != null) && thread.isAlive();
217     }
218
219     @Override
220     public void sendCommand(GatewayCommand command) {
221         synchronized (pendingCommands) {
222             // append the command to the end of the FIFO queue
223             if (MAXIMUM_FIFO_BUFFER_SIZE < pendingCommands.size()
224                     || !pendingCommands.offer(new PendingCommand(command))) {
225                 logger.warn("Command refused: FIFO buffer overrun => PLEASE REPORT !!");
226             }
227             // send the FIFO head command, which may or may not be the one just added
228             pendingCommandsSendHeadCommandIfReady();
229         } // release the pendingCommands lock
230     }
231
232     /**
233      * Process the incoming message. Remove any expired commands from the queue. Check if the incoming message is an
234      * acknowledgement. If it is the acknowledgement for the FIFO head command, remove it from the queue. Try to send
235      * the (next) FIFO head command, if it exists, and is ready to send. And finally if the message is not an
236      * acknowledgement, check if it is a valid message, and if so, pass it to the gateway Thing handler for processing.
237      *
238      * @param message the incoming message received from the gateway
239      */
240     private void handleMessage(String message) {
241         // check if the message is a command acknowledgement e.g. having the form "XX: yyy"
242         boolean isCommandAcknowledgement = (message.length() > 2) && (message.charAt(2) == ':');
243
244         synchronized (pendingCommands) {
245             // remove all expired commands
246             pendingCommandsRemoveAllExpiredCommands();
247
248             // if acknowledgement is for the FIFO head command, remove it from the queue
249             if (isCommandAcknowledgement) {
250                 pendingCommandsRemoveHeadCommandIfAcknowledgement(message);
251             }
252
253             // (re-)send the FIFO head command, if it exists and is ready to send
254             pendingCommandsSendHeadCommandIfReady();
255         } // release the pendingCommands lock
256
257         if (isCommandAcknowledgement) {
258             callback.receiveAcknowledgement(message);
259         } else if (message.startsWith(WDT_RESET_RESPONSE_MESSAGE)) {
260             logger.warn("OpenTherm Gateway was reset by its Watch-Dog Timer!");
261         } else {
262             Message msg = Message.parse(message);
263
264             // ignore and log bad messages
265             if (msg == null) {
266                 logger.debug("Received message: {}, (unknown)", message);
267                 return;
268             }
269
270             // pass good messages to the Thing handler for processing
271             if (msg.getMessageType() == MessageType.READACK || msg.getMessageType() == MessageType.WRITEDATA
272                     || msg.getID() == 0 || msg.getID() == 1) {
273                 callback.receiveMessage(msg);
274             }
275         }
276     }
277
278     /**
279      * If there is a FIFO head command that is ready to (re-)send, then send it.
280      */
281     private void pendingCommandsSendHeadCommandIfReady() {
282         // process the command at the head of the queue
283         PendingCommand headCommand = pendingCommands.peek();
284         if (headCommand != null && headCommand.readyToSend()) {
285             String message = headCommand.command.toFullString();
286
287             // transmit the command string
288             PrintWriter writer = this.writer;
289             if (isConnected() && (writer != null)) {
290                 writer.print(message + "\r\n");
291                 writer.flush();
292                 if (writer.checkError()) {
293                     logger.warn("Error sending command to OpenTherm Gateway => PLEASE REPORT !!");
294                     stop();
295                 }
296                 if (logger.isTraceEnabled()) {
297                     logger.trace("Sent: {}{}", message, headCommand.sent() ? " (repeat)" : "");
298                 }
299                 headCommand.sentTime = Instant.now();
300             } else {
301                 logger.debug("Unable to send command: {}. OpenThermGatewaySocketConnector is not connected.", message);
302             }
303         }
304     }
305
306     /**
307      * If the acknowledgement message corresponds to the FIFO head command then remove it from the queue.
308      *
309      * @param message must be an acknowledgement message in the form "XX: yyy"
310      */
311     private void pendingCommandsRemoveHeadCommandIfAcknowledgement(String message) {
312         PendingCommand headCommand = pendingCommands.peek();
313         if (headCommand != null && headCommand.command.getCode().equals(message.substring(0, 2))) {
314             pendingCommands.poll();
315         }
316     }
317
318     /**
319      * Remove all expired commands from the queue.
320      */
321     private void pendingCommandsRemoveAllExpiredCommands() {
322         pendingCommands.removeIf(pendingCommand -> pendingCommand.expired());
323     }
324 }