2 * Copyright (c) 2010-2022 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.openthermgateway.internal;
15 import static org.openhab.binding.openthermgateway.internal.OpenThermGatewayBindingConstants.BINDING_ID;
17 import java.io.BufferedReader;
18 import java.io.IOException;
19 import java.io.InputStreamReader;
20 import java.io.PrintWriter;
21 import java.net.InetSocketAddress;
22 import java.net.Socket;
23 import java.util.AbstractMap;
25 import java.util.Map.Entry;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
34 import org.eclipse.jdt.annotation.NonNullByDefault;
35 import org.eclipse.jdt.annotation.Nullable;
36 import org.openhab.core.common.NamedThreadFactory;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * The {@link OpenThermGatewaySocketConnector} is responsible for handling the socket connection
43 * @author Arjen Korevaar - Initial contribution
44 * @author Arjan Mels - Improved robustness by re-sending commands, handling all message types (not only Boiler)
47 public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnector {
48 private static final int COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS = 100;
49 private static final int COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS = 5000;
51 private final Logger logger = LoggerFactory.getLogger(OpenThermGatewaySocketConnector.class);
53 private final OpenThermGatewayCallback callback;
54 private final String ipaddress;
55 private final int port;
56 private final int connectTimeoutMilliseconds;
57 private final int readTimeoutMilliSeconds;
59 private @Nullable volatile PrintWriter writer;
60 private @Nullable volatile Thread thread;
61 private @Nullable Future<Boolean> future;
62 private @Nullable ExecutorService executor;
64 private Map<String, Entry<Long, GatewayCommand>> pendingCommands = new ConcurrentHashMap<>();
66 public OpenThermGatewaySocketConnector(OpenThermGatewayCallback callback, OpenThermGatewayConfiguration config) {
67 this.callback = callback;
68 ipaddress = config.ipaddress;
70 connectTimeoutMilliseconds = config.connectTimeoutSeconds * 1000;
71 readTimeoutMilliSeconds = config.readTimeoutSeconds * 1000;
75 public Boolean call() throws Exception {
76 thread = Thread.currentThread();
77 try (Socket socket = new Socket()) {
78 logger.debug("Connecting OpenThermGatewaySocketConnector to {}:{}", this.ipaddress, this.port);
79 callback.connectionStateChanged(ConnectionState.CONNECTING);
81 socket.connect(new InetSocketAddress(ipaddress, port), connectTimeoutMilliseconds);
82 socket.setSoTimeout(readTimeoutMilliSeconds);
84 logger.debug("OpenThermGatewaySocketConnector connected");
85 callback.connectionStateChanged(ConnectionState.CONNECTED);
87 try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
88 PrintWriter wrt = new PrintWriter(socket.getOutputStream(), true)) {
89 // Make writer accessible on class level
92 sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTREPORT, "A"));
93 // Set the OTGW to report every message it receives and transmits
94 sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTSUMMARY, "0"));
96 while (!Thread.currentThread().isInterrupted()) {
98 String message = reader.readLine();
100 if (message != null) {
101 handleMessage(message);
103 logger.debug("Received NULL message from OpenTherm Gateway (EOF)");
107 } catch (IOException ex) {
108 logger.warn("Error communicating with OpenTherm Gateway: '{}'", ex.getMessage());
110 } catch (IOException ex) {
111 logger.warn("Unable to connect to the OpenTherm Gateway: '{}'", ex.getMessage());
115 logger.debug("OpenThermGatewaySocketConnector disconnected");
116 callback.connectionStateChanged(ConnectionState.DISCONNECTED);
122 logger.debug("Stopping OpenThermGatewaySocketConnector");
124 Thread thread = this.thread;
125 Future<Boolean> future = this.future;
126 ExecutorService executor = this.executor;
128 if (executor != null) {
131 if ((thread != null) && thread.isAlive()) {
134 if (future != null) {
136 future.get(readTimeoutMilliSeconds, TimeUnit.MILLISECONDS);
137 } catch (ExecutionException e) {
138 // expected exception due to e.g. IOException on socket close
139 } catch (TimeoutException | InterruptedException e) {
140 // unexpected exception
141 logger.warn("stop() exception '{}' => PLEASE REPORT !!", e.getMessage());
147 this.executor = null;
151 public void start() {
152 logger.debug("Starting OpenThermGatewaySocketConnector");
153 ExecutorService executor = this.executor = Executors
154 .newSingleThreadExecutor(new NamedThreadFactory("binding-" + BINDING_ID));
155 future = executor.submit(this);
159 public synchronized boolean isConnected() {
160 Thread thread = this.thread;
161 return (thread != null) && thread.isAlive();
165 public synchronized void sendCommand(GatewayCommand command) {
166 PrintWriter wrt = writer;
168 pendingCommands.put(command.getCode(),
169 new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), command));
171 String msg = command.toFullString();
173 if (isConnected() && (wrt != null)) {
174 logger.debug("Sending message: {}", msg);
175 wrt.print(msg + "\r\n");
177 if (wrt.checkError()) {
178 logger.warn("sendCommand() error sending message to OpenTherm Gateway => PLEASE REPORT !!");
182 logger.debug("Unable to send message: {}. OpenThermGatewaySocketConnector is not connected.", msg);
186 private void handleMessage(String message) {
187 if (message.length() > 2 && message.charAt(2) == ':') {
188 String code = message.substring(0, 2);
189 String value = message.substring(3);
191 logger.debug("Received command confirmation: {}: {}", code, value);
192 pendingCommands.remove(code);
196 long currentTime = System.currentTimeMillis();
198 for (Entry<Long, GatewayCommand> timeAndCommand : pendingCommands.values()) {
199 long responseTime = timeAndCommand.getKey() + COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS;
200 long timeoutTime = timeAndCommand.getKey() + COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS;
202 if (currentTime > responseTime && currentTime <= timeoutTime) {
203 logger.debug("Resending command: {}", timeAndCommand.getValue());
204 sendCommand(timeAndCommand.getValue());
205 } else if (currentTime > timeoutTime) {
206 pendingCommands.remove(timeAndCommand.getValue().getCode());
210 Message msg = Message.parse(message);
213 logger.trace("Received message: {}, (unknown)", message);
216 logger.trace("Received message: {}, {} {} {}", message, msg.getID(), msg.getCodeType(), msg.getMessageType());
217 if (msg.getMessageType() == MessageType.READACK || msg.getMessageType() == MessageType.WRITEDATA
218 || msg.getID() == 0 || msg.getID() == 1) {
219 callback.receiveMessage(msg);