]> git.basschouten.com Git - openhab-addons.git/commitdiff
[openthermgateway] Various improvements (#12507)
authorAndrew Fiddian-Green <software@whitebear.ch>
Mon, 28 Mar 2022 19:40:35 +0000 (20:40 +0100)
committerGitHub <noreply@github.com>
Mon, 28 Mar 2022 19:40:35 +0000 (21:40 +0200)
* [openthermgateway] various tweaks

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/handler/OpenThermGatewayHandler.java
bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/internal/OpenThermGatewayCallback.java
bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/internal/OpenThermGatewaySocketConnector.java
bundles/org.openhab.binding.openthermgateway/src/main/resources/OH-INF/thing/openthermgateway.xml

index 33b7ae7a3cd73b177d22aaeb955e0301268893dc..06809cd3a5f6ae4a97d02fb66607cd1386913576 100644 (file)
@@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory;
  */
 @NonNullByDefault
 public class OpenThermGatewayHandler extends BaseBridgeHandler implements OpenThermGatewayCallback {
+    private static final String PROPERTY_GATEWAY_ID_NAME = "gatewayId";
+    private static final String PROPERTY_GATEWAY_ID_TAG = "PR: A=";
 
     private final Logger logger = LoggerFactory.getLogger(OpenThermGatewayHandler.class);
 
@@ -181,6 +183,18 @@ public class OpenThermGatewayHandler extends BaseBridgeHandler implements OpenTh
         }
     }
 
+    @Override
+    public void receiveAcknowledgement(String message) {
+        scheduler.submit(() -> receiveAcknowledgementTask(message));
+    }
+
+    private void receiveAcknowledgementTask(String message) {
+        if (message.startsWith(PROPERTY_GATEWAY_ID_TAG)) {
+            getThing().setProperty(PROPERTY_GATEWAY_ID_NAME,
+                    message.substring(PROPERTY_GATEWAY_ID_TAG.length()).strip());
+        }
+    }
+
     @Override
     public void handleRemoval() {
         logger.debug("Removing OpenThermGateway handler");
index d78a9a35a11e264977870d33640b97e3f647b07a..2d6f392d97a57caa7892145103e65430403501cb 100644 (file)
@@ -17,7 +17,7 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
 /**
  * The {@link OpenThermGatewayCallback} is used as a callback interface by a connector to signal status
  * and relay incoming messages to be processed by the binding.
- * 
+ *
  * @author Arjen Korevaar - Initial contribution
  */
 @NonNullByDefault
@@ -26,4 +26,6 @@ public interface OpenThermGatewayCallback {
     void connectionStateChanged(ConnectionState state);
 
     void receiveMessage(Message message);
+
+    void receiveAcknowledgement(String message);
 }
index 7e8f0942dbadc8b83a3bbd1625cc140e127b49b5..c6865c6a9cf66f74ba5c88f86191b59f7488daab 100644 (file)
@@ -20,10 +20,9 @@ import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.AbstractMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
+import java.time.Instant;
+import java.util.ArrayDeque;
+import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -42,11 +41,15 @@ import org.slf4j.LoggerFactory;
  *
  * @author Arjen Korevaar - Initial contribution
  * @author Arjan Mels - Improved robustness by re-sending commands, handling all message types (not only Boiler)
+ * @author Andrew Fiddian-Green - Improve thread interruption, socket timeouts, exception handling, FIFO command queue
  */
 @NonNullByDefault
 public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnector {
     private static final int COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS = 100;
     private static final int COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS = 5000;
+    private static final int MAXIMUM_FIFO_BUFFER_SIZE = 20;
+
+    private static final String WDT_RESET_RESPONSE_MESSAGE = "WDT reset";
 
     private final Logger logger = LoggerFactory.getLogger(OpenThermGatewaySocketConnector.class);
 
@@ -61,7 +64,56 @@ public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnecto
     private @Nullable Future<Boolean> future;
     private @Nullable ExecutorService executor;
 
-    private Map<String, Entry<Long, GatewayCommand>> pendingCommands = new ConcurrentHashMap<>();
+    /**
+     * FIFO queue of commands that are pending being sent to the gateway. That is commands that are either not yet sent,
+     * or sent but not yet acknowledged and pending possible re-sending.
+     *
+     * Note: we must use 'synchronized' when accessing this object to ensure proper thread safety.
+     */
+    private final Queue<PendingCommand> pendingCommands = new ArrayDeque<>(MAXIMUM_FIFO_BUFFER_SIZE);
+
+    /**
+     * Wrapper for a command entry in the pending command FIFO queue.
+     *
+     * @author AndrewFG - initial contribution
+     */
+    private class PendingCommand {
+        protected final GatewayCommand command;
+        protected final Instant expiryTime = Instant.now().plusMillis(COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS);
+        protected Instant sentTime = Instant.MIN;
+
+        protected PendingCommand(GatewayCommand command) {
+            this.command = command;
+        }
+
+        /**
+         * Check if the command has been sent to the gateway.
+         *
+         * @return true if it has been sent
+         */
+        protected boolean sent() {
+            return Instant.MIN.isBefore(sentTime);
+        }
+
+        /**
+         * Check if the command is ready to send (or re-send) to the gateway.
+         *
+         * @return true if the command has either not been sent, or sent but not acknowledged within due time i.e. it
+         *         needs to be re-sent
+         */
+        protected boolean readyToSend() {
+            return sentTime.isBefore(Instant.now().minusMillis(COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS));
+        }
+
+        /**
+         * Check if the command has expired.
+         *
+         * @return true if the expiry time has expired
+         */
+        protected boolean expired() {
+            return Instant.now().isAfter(expiryTime);
+        }
+    }
 
     public OpenThermGatewaySocketConnector(OpenThermGatewayCallback callback, OpenThermGatewayConfiguration config) {
         this.callback = callback;
@@ -98,12 +150,15 @@ public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnecto
                     String message = reader.readLine();
 
                     if (message != null) {
+                        logger.trace("Read: {}", message);
                         handleMessage(message);
                     } else {
                         logger.debug("Received NULL message from OpenTherm Gateway (EOF)");
                         break;
                     }
                 }
+                // disable reporting every message (for cleaner re-starting)
+                sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTSUMMARY, "1"));
             } catch (IOException ex) {
                 logger.warn("Error communicating with OpenTherm Gateway: '{}'", ex.getMessage());
             }
@@ -162,61 +217,108 @@ public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnecto
     }
 
     @Override
-    public synchronized void sendCommand(GatewayCommand command) {
-        PrintWriter wrt = writer;
-
-        pendingCommands.put(command.getCode(),
-                new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), command));
-
-        String msg = command.toFullString();
-
-        if (isConnected() && (wrt != null)) {
-            logger.debug("Sending message: {}", msg);
-            wrt.print(msg + "\r\n");
-            wrt.flush();
-            if (wrt.checkError()) {
-                logger.warn("sendCommand() error sending message to OpenTherm Gateway => PLEASE REPORT !!");
-                stop();
+    public void sendCommand(GatewayCommand command) {
+        synchronized (pendingCommands) {
+            // append the command to the end of the FIFO queue
+            if (MAXIMUM_FIFO_BUFFER_SIZE < pendingCommands.size()
+                    || !pendingCommands.offer(new PendingCommand(command))) {
+                logger.warn("Command refused: FIFO buffer overrun => PLEASE REPORT !!");
             }
-        } else {
-            logger.debug("Unable to send message: {}. OpenThermGatewaySocketConnector is not connected.", msg);
-        }
+            // send the FIFO head command, which may or may not be the one just added
+            pendingCommandsSendHeadCommandIfReady();
+        } // release the pendingCommands lock
     }
 
+    /**
+     * Process the incoming message. Remove any expired commands from the queue. Check if the incoming message is an
+     * acknowledgement. If it is the acknowledgement for the FIFO head command, remove it from the queue. Try to send
+     * the (next) FIFO head command, if it exists, and is ready to send. And finally if the message is not an
+     * acknowledgement, check if it is a valid message, and if so, pass it to the gateway Thing handler for processing.
+     *
+     * @param message the incoming message received from the gateway
+     */
     private void handleMessage(String message) {
-        if (message.length() > 2 && message.charAt(2) == ':') {
-            String code = message.substring(0, 2);
-            String value = message.substring(3);
+        // check if the message is a command acknowledgement e.g. having the form "XX: yyy"
+        boolean isCommandAcknowledgement = (message.length() > 2) && (message.charAt(2) == ':');
 
-            logger.debug("Received command confirmation: {}: {}", code, value);
-            pendingCommands.remove(code);
-            return;
-        }
+        synchronized (pendingCommands) {
+            // remove all expired commands
+            pendingCommandsRemoveAllExpiredCommands();
 
-        long currentTime = System.currentTimeMillis();
+            // if acknowledgement is for the FIFO head command, remove it from the queue
+            if (isCommandAcknowledgement) {
+                pendingCommandsRemoveHeadCommandIfAcknowledgement(message);
+            }
+
+            // (re-)send the FIFO head command, if it exists and is ready to send
+            pendingCommandsSendHeadCommandIfReady();
+        } // release the pendingCommands lock
 
-        for (Entry<Long, GatewayCommand> timeAndCommand : pendingCommands.values()) {
-            long responseTime = timeAndCommand.getKey() + COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS;
-            long timeoutTime = timeAndCommand.getKey() + COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS;
+        if (isCommandAcknowledgement) {
+            callback.receiveAcknowledgement(message);
+        } else if (message.startsWith(WDT_RESET_RESPONSE_MESSAGE)) {
+            logger.warn("OpenTherm Gateway was reset by its Watch-Dog Timer!");
+        } else {
+            Message msg = Message.parse(message);
 
-            if (currentTime > responseTime && currentTime <= timeoutTime) {
-                logger.debug("Resending command: {}", timeAndCommand.getValue());
-                sendCommand(timeAndCommand.getValue());
-            } else if (currentTime > timeoutTime) {
-                pendingCommands.remove(timeAndCommand.getValue().getCode());
+            // ignore and log bad messages
+            if (msg == null) {
+                logger.debug("Received message: {}, (unknown)", message);
+                return;
             }
-        }
 
-        Message msg = Message.parse(message);
+            // pass good messages to the Thing handler for processing
+            if (msg.getMessageType() == MessageType.READACK || msg.getMessageType() == MessageType.WRITEDATA
+                    || msg.getID() == 0 || msg.getID() == 1) {
+                callback.receiveMessage(msg);
+            }
+        }
+    }
 
-        if (msg == null) {
-            logger.trace("Received message: {}, (unknown)", message);
-            return;
+    /**
+     * If there is a FIFO head command that is ready to (re-)send, then send it.
+     */
+    private void pendingCommandsSendHeadCommandIfReady() {
+        // process the command at the head of the queue
+        PendingCommand headCommand = pendingCommands.peek();
+        if (headCommand != null && headCommand.readyToSend()) {
+            String message = headCommand.command.toFullString();
+
+            // transmit the command string
+            PrintWriter writer = this.writer;
+            if (isConnected() && (writer != null)) {
+                writer.print(message + "\r\n");
+                writer.flush();
+                if (writer.checkError()) {
+                    logger.warn("Error sending command to OpenTherm Gateway => PLEASE REPORT !!");
+                    stop();
+                }
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Sent: {}{}", message, headCommand.sent() ? " (repeat)" : "");
+                }
+                headCommand.sentTime = Instant.now();
+            } else {
+                logger.debug("Unable to send command: {}. OpenThermGatewaySocketConnector is not connected.", message);
+            }
         }
-        logger.trace("Received message: {}, {} {} {}", message, msg.getID(), msg.getCodeType(), msg.getMessageType());
-        if (msg.getMessageType() == MessageType.READACK || msg.getMessageType() == MessageType.WRITEDATA
-                || msg.getID() == 0 || msg.getID() == 1) {
-            callback.receiveMessage(msg);
+    }
+
+    /**
+     * If the acknowledgement message corresponds to the FIFO head command then remove it from the queue.
+     *
+     * @param message must be an acknowledgement message in the form "XX: yyy"
+     */
+    private void pendingCommandsRemoveHeadCommandIfAcknowledgement(String message) {
+        PendingCommand headCommand = pendingCommands.peek();
+        if (headCommand != null && headCommand.command.getCode().equals(message.substring(0, 2))) {
+            pendingCommands.poll();
         }
     }
+
+    /**
+     * Remove all expired commands from the queue.
+     */
+    private void pendingCommandsRemoveAllExpiredCommands() {
+        pendingCommands.removeIf(pendingCommand -> pendingCommand.expired());
+    }
 }
index ac5a39d6a3467d8a942041531576dd745f326894..4f4337164f93d27f6f843f2258f7c034d4bf3805 100644 (file)
@@ -10,7 +10,7 @@
                        <channel id="sendcommand" typeId="sendcommand"/>
                </channels>
                <properties>
-                       <property name="version">2.2.0</property>
+                       <property name="version">2.2.1</property>
                </properties>
                <config-description-ref uri="thing-type:openthermgateway:openthermgateway"/>
        </bridge-type>