import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.util.AbstractMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
+import java.time.Instant;
+import java.util.ArrayDeque;
+import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
*
* @author Arjen Korevaar - Initial contribution
* @author Arjan Mels - Improved robustness by re-sending commands, handling all message types (not only Boiler)
+ * @author Andrew Fiddian-Green - Improve thread interruption, socket timeouts, exception handling, FIFO command queue
*/
@NonNullByDefault
public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnector {
private static final int COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS = 100;
private static final int COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS = 5000;
+ private static final int MAXIMUM_FIFO_BUFFER_SIZE = 20;
+
+ private static final String WDT_RESET_RESPONSE_MESSAGE = "WDT reset";
private final Logger logger = LoggerFactory.getLogger(OpenThermGatewaySocketConnector.class);
private @Nullable Future<Boolean> future;
private @Nullable ExecutorService executor;
- private Map<String, Entry<Long, GatewayCommand>> pendingCommands = new ConcurrentHashMap<>();
+ /**
+ * FIFO queue of commands that are pending being sent to the gateway. That is commands that are either not yet sent,
+ * or sent but not yet acknowledged and pending possible re-sending.
+ *
+ * Note: we must use 'synchronized' when accessing this object to ensure proper thread safety.
+ */
+ private final Queue<PendingCommand> pendingCommands = new ArrayDeque<>(MAXIMUM_FIFO_BUFFER_SIZE);
+
+ /**
+ * Wrapper for a command entry in the pending command FIFO queue.
+ *
+ * @author AndrewFG - initial contribution
+ */
+ private class PendingCommand {
+ protected final GatewayCommand command;
+ protected final Instant expiryTime = Instant.now().plusMillis(COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS);
+ protected Instant sentTime = Instant.MIN;
+
+ protected PendingCommand(GatewayCommand command) {
+ this.command = command;
+ }
+
+ /**
+ * Check if the command has been sent to the gateway.
+ *
+ * @return true if it has been sent
+ */
+ protected boolean sent() {
+ return Instant.MIN.isBefore(sentTime);
+ }
+
+ /**
+ * Check if the command is ready to send (or re-send) to the gateway.
+ *
+ * @return true if the command has either not been sent, or sent but not acknowledged within due time i.e. it
+ * needs to be re-sent
+ */
+ protected boolean readyToSend() {
+ return sentTime.isBefore(Instant.now().minusMillis(COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS));
+ }
+
+ /**
+ * Check if the command has expired.
+ *
+ * @return true if the expiry time has expired
+ */
+ protected boolean expired() {
+ return Instant.now().isAfter(expiryTime);
+ }
+ }
public OpenThermGatewaySocketConnector(OpenThermGatewayCallback callback, OpenThermGatewayConfiguration config) {
this.callback = callback;
String message = reader.readLine();
if (message != null) {
+ logger.trace("Read: {}", message);
handleMessage(message);
} else {
logger.debug("Received NULL message from OpenTherm Gateway (EOF)");
break;
}
}
+ // disable reporting every message (for cleaner re-starting)
+ sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTSUMMARY, "1"));
} catch (IOException ex) {
logger.warn("Error communicating with OpenTherm Gateway: '{}'", ex.getMessage());
}
}
@Override
- public synchronized void sendCommand(GatewayCommand command) {
- PrintWriter wrt = writer;
-
- pendingCommands.put(command.getCode(),
- new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), command));
-
- String msg = command.toFullString();
-
- if (isConnected() && (wrt != null)) {
- logger.debug("Sending message: {}", msg);
- wrt.print(msg + "\r\n");
- wrt.flush();
- if (wrt.checkError()) {
- logger.warn("sendCommand() error sending message to OpenTherm Gateway => PLEASE REPORT !!");
- stop();
+ public void sendCommand(GatewayCommand command) {
+ synchronized (pendingCommands) {
+ // append the command to the end of the FIFO queue
+ if (MAXIMUM_FIFO_BUFFER_SIZE < pendingCommands.size()
+ || !pendingCommands.offer(new PendingCommand(command))) {
+ logger.warn("Command refused: FIFO buffer overrun => PLEASE REPORT !!");
}
- } else {
- logger.debug("Unable to send message: {}. OpenThermGatewaySocketConnector is not connected.", msg);
- }
+ // send the FIFO head command, which may or may not be the one just added
+ pendingCommandsSendHeadCommandIfReady();
+ } // release the pendingCommands lock
}
+ /**
+ * Process the incoming message. Remove any expired commands from the queue. Check if the incoming message is an
+ * acknowledgement. If it is the acknowledgement for the FIFO head command, remove it from the queue. Try to send
+ * the (next) FIFO head command, if it exists, and is ready to send. And finally if the message is not an
+ * acknowledgement, check if it is a valid message, and if so, pass it to the gateway Thing handler for processing.
+ *
+ * @param message the incoming message received from the gateway
+ */
private void handleMessage(String message) {
- if (message.length() > 2 && message.charAt(2) == ':') {
- String code = message.substring(0, 2);
- String value = message.substring(3);
+ // check if the message is a command acknowledgement e.g. having the form "XX: yyy"
+ boolean isCommandAcknowledgement = (message.length() > 2) && (message.charAt(2) == ':');
- logger.debug("Received command confirmation: {}: {}", code, value);
- pendingCommands.remove(code);
- return;
- }
+ synchronized (pendingCommands) {
+ // remove all expired commands
+ pendingCommandsRemoveAllExpiredCommands();
- long currentTime = System.currentTimeMillis();
+ // if acknowledgement is for the FIFO head command, remove it from the queue
+ if (isCommandAcknowledgement) {
+ pendingCommandsRemoveHeadCommandIfAcknowledgement(message);
+ }
+
+ // (re-)send the FIFO head command, if it exists and is ready to send
+ pendingCommandsSendHeadCommandIfReady();
+ } // release the pendingCommands lock
- for (Entry<Long, GatewayCommand> timeAndCommand : pendingCommands.values()) {
- long responseTime = timeAndCommand.getKey() + COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS;
- long timeoutTime = timeAndCommand.getKey() + COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS;
+ if (isCommandAcknowledgement) {
+ callback.receiveAcknowledgement(message);
+ } else if (message.startsWith(WDT_RESET_RESPONSE_MESSAGE)) {
+ logger.warn("OpenTherm Gateway was reset by its Watch-Dog Timer!");
+ } else {
+ Message msg = Message.parse(message);
- if (currentTime > responseTime && currentTime <= timeoutTime) {
- logger.debug("Resending command: {}", timeAndCommand.getValue());
- sendCommand(timeAndCommand.getValue());
- } else if (currentTime > timeoutTime) {
- pendingCommands.remove(timeAndCommand.getValue().getCode());
+ // ignore and log bad messages
+ if (msg == null) {
+ logger.debug("Received message: {}, (unknown)", message);
+ return;
}
- }
- Message msg = Message.parse(message);
+ // pass good messages to the Thing handler for processing
+ if (msg.getMessageType() == MessageType.READACK || msg.getMessageType() == MessageType.WRITEDATA
+ || msg.getID() == 0 || msg.getID() == 1) {
+ callback.receiveMessage(msg);
+ }
+ }
+ }
- if (msg == null) {
- logger.trace("Received message: {}, (unknown)", message);
- return;
+ /**
+ * If there is a FIFO head command that is ready to (re-)send, then send it.
+ */
+ private void pendingCommandsSendHeadCommandIfReady() {
+ // process the command at the head of the queue
+ PendingCommand headCommand = pendingCommands.peek();
+ if (headCommand != null && headCommand.readyToSend()) {
+ String message = headCommand.command.toFullString();
+
+ // transmit the command string
+ PrintWriter writer = this.writer;
+ if (isConnected() && (writer != null)) {
+ writer.print(message + "\r\n");
+ writer.flush();
+ if (writer.checkError()) {
+ logger.warn("Error sending command to OpenTherm Gateway => PLEASE REPORT !!");
+ stop();
+ }
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sent: {}{}", message, headCommand.sent() ? " (repeat)" : "");
+ }
+ headCommand.sentTime = Instant.now();
+ } else {
+ logger.debug("Unable to send command: {}. OpenThermGatewaySocketConnector is not connected.", message);
+ }
}
- logger.trace("Received message: {}, {} {} {}", message, msg.getID(), msg.getCodeType(), msg.getMessageType());
- if (msg.getMessageType() == MessageType.READACK || msg.getMessageType() == MessageType.WRITEDATA
- || msg.getID() == 0 || msg.getID() == 1) {
- callback.receiveMessage(msg);
+ }
+
+ /**
+ * If the acknowledgement message corresponds to the FIFO head command then remove it from the queue.
+ *
+ * @param message must be an acknowledgement message in the form "XX: yyy"
+ */
+ private void pendingCommandsRemoveHeadCommandIfAcknowledgement(String message) {
+ PendingCommand headCommand = pendingCommands.peek();
+ if (headCommand != null && headCommand.command.getCode().equals(message.substring(0, 2))) {
+ pendingCommands.poll();
}
}
+
+ /**
+ * Remove all expired commands from the queue.
+ */
+ private void pendingCommandsRemoveAllExpiredCommands() {
+ pendingCommands.removeIf(pendingCommand -> pendingCommand.expired());
+ }
}