]> git.basschouten.com Git - openhab-addons.git/commitdiff
[velux] Improve shutdown exception handling (#12356)
authorAndrew Fiddian-Green <software@whitebear.ch>
Sat, 19 Mar 2022 14:00:30 +0000 (14:00 +0000)
committerGitHub <noreply@github.com>
Sat, 19 Mar 2022 14:00:30 +0000 (15:00 +0100)
* [velux] add isDisposing flag to accelerate shutdown

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
* [velux] refactor Poller into a separate class

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
* [velux] use new Poller class; fix startup, shutdown, and exception code

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
* [velux] demote confusing log message

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
* [velux] slightly more elegant interrupt flag set / check

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/Connection.java
bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/DataInputStreamWithTimeout.java
bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/Poller.java [new file with mode: 0644]
bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/handler/VeluxBridgeHandler.java

index f0958e41a9d6da0123f4085c769bb630e562634e..dc80b0994d8f4dfa0c2637400193517f3d10443c 100644 (file)
@@ -147,14 +147,18 @@ public class Connection implements Closeable {
             } catch (IOException ioe) {
                 logger.info("io() on {}: Exception occurred during I/O: {}.", host, ioe.getMessage());
                 lastIOE = ioe;
-                // Error Retries with Exponential Backoff
-                long waitTime = ((long) Math.pow(2, retryCount)
-                        * bridgeInstance.veluxBridgeConfiguration().timeoutMsecs);
-                logger.trace("io() on {}: wait time {} msecs.", host, waitTime);
-                try {
-                    Thread.sleep(waitTime);
-                } catch (InterruptedException ie) {
-                    logger.trace("io() on {}: wait interrupted.", host);
+                if (bridgeInstance.isDisposing()) {
+                    break;
+                } else {
+                    // Error Retries with Exponential Backoff
+                    long waitTime = ((long) Math.pow(2, retryCount)
+                            * bridgeInstance.veluxBridgeConfiguration().timeoutMsecs);
+                    logger.trace("io() on {}: wait time {} msecs.", host, waitTime);
+                    try {
+                        Thread.sleep(waitTime);
+                    } catch (InterruptedException ie) {
+                        logger.trace("io() on {}: wait interrupted.", host);
+                    }
                 }
             }
         } while (retryCount++ < bridgeInstance.veluxBridgeConfiguration().retries);
index 469ede68c7892cb577334720d223f41516cd6127..98c0d5620d8811e83f0e66407d2b7a7aadf50e87 100644 (file)
@@ -15,16 +15,15 @@ package org.openhab.binding.velux.internal.bridge.slip.io;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.SocketTimeoutException;
-import java.util.Arrays;
 import java.util.NoSuchElementException;
 import java.util.Queue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
@@ -45,141 +44,34 @@ import org.slf4j.LoggerFactory;
 @NonNullByDefault
 class DataInputStreamWithTimeout implements Closeable {
 
-    private static final int QUEUE_SIZE = 512;
-    private static final int BUFFER_SIZE = 512;
     private static final int SLEEP_INTERVAL_MSECS = 50;
-
-    // special character that marks the first and last byte of a slip message
-    private static final byte SLIP_MARK = (byte) 0xc0;
-    private static final byte SLIP_PROT = 0;
+    private static final long MAX_WAIT_SECONDS = 15;
 
     private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
 
     private final Queue<byte[]> slipMessageQueue = new ConcurrentLinkedQueue<>();
+    private final InputStream inputStream;
+    private final VeluxBridgeHandler bridge;
 
-    private InputStream inputStream;
-
-    private @Nullable String pollException = null;
-    private @Nullable Poller pollRunner = null;
-    private ExecutorService executor;
-
-    private class Poller implements Callable<Boolean> {
-
-        private boolean interrupted = false;
-        private Future<Boolean> pollerFinished;
-
-        public Poller(ExecutorService executor) {
-            logger.trace("Poller: created");
-            pollerFinished = executor.submit(this);
-        }
-
-        public void interrupt() {
-            interrupted = true;
-            try {
-                pollerFinished.get();
-            } catch (InterruptedException | ExecutionException e) {
-            }
-        }
-
-        /**
-         * Task that loops to read bytes from {@link InputStream} and build SLIP packets from them. The SLIP packets are
-         * placed in a {@link ConcurrentLinkedQueue}. It loops continuously until 'interrupt()' or 'Thread.interrupt()'
-         * are called when terminates early after the next socket read timeout.
-         */
-        @Override
-        public Boolean call() throws Exception {
-            logger.trace("Poller.call(): started");
-            byte[] buf = new byte[BUFFER_SIZE];
-            int byt;
-            int i = 0;
-
-            // clean start, no exception, empty queue
-            pollException = null;
-            slipMessageQueue.clear();
-
-            // loop forever; on shutdown interrupt() gets called to break out of the loop
-            while (true) {
-                try {
-                    if (interrupted) {
-                        // fully flush the input buffer
-                        inputStream.readAllBytes();
-                        break;
-                    }
-                    byt = inputStream.read();
-                    if (byt < 0) {
-                        // end of stream is OK => keep on polling
-                        continue;
-                    }
-                    buf[i] = (byte) byt;
-                    if ((i > 0) && (buf[i] == SLIP_MARK)) {
-                        // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
-                        if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
-                            slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
-                            if (slipMessageQueue.size() > QUEUE_SIZE) {
-                                logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
-                                slipMessageQueue.poll();
-                            }
-                            i = 0;
-                        } else {
-                            logger.warn("Poller.call(): non slip messsage discarded => PLEASE REPORT !!");
-                            buf[0] = SLIP_MARK;
-                            i = 1;
-                        }
-                        continue;
-                    }
-                    if (++i >= BUFFER_SIZE) {
-                        logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
-                        i = 0;
-                    }
-                } catch (SocketTimeoutException e) {
-                    // socket read time outs are OK => keep on polling; unless interrupted
-                    if (interrupted) {
-                        break;
-                    }
-                    continue;
-                } catch (IOException e) {
-                    // any other exception => stop polling
-                    String msg = e.getMessage();
-                    pollException = msg != null ? msg : "Generic IOException";
-                    logger.debug("Poller.call(): stopping '{}'", pollException);
-                    break;
-                }
-            }
-
-            logger.trace("Poller.call(): ended");
-            // we only get here if shutdown or an error occurs so free ourself so we can be recreated again
-            pollRunner = null;
-            return true;
-        }
-    }
-
-    /**
-     * Check if there was an exception on the polling loop task and if so, throw it back on the caller thread.
-     *
-     * @throws IOException
-     */
-    private void throwIfPollException() throws IOException {
-        if (pollException != null) {
-            logger.debug("passPollException() polling loop exception {}", pollException);
-            throw new IOException(pollException);
-        }
-    }
+    private @Nullable Poller poller;
+    private @Nullable Future<Boolean> future;
+    private @Nullable ExecutorService executor;
 
     /**
      * Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream}
      *
-     * @param stream the specified input stream
+     * @param inputStream the specified input stream
      * @param bridge the actual Bridge Thing instance
      */
-    public DataInputStreamWithTimeout(InputStream stream, VeluxBridgeHandler bridge) {
-        inputStream = stream;
-        executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
+    public DataInputStreamWithTimeout(InputStream inputStream, VeluxBridgeHandler bridge) {
+        this.inputStream = inputStream;
+        this.bridge = bridge;
     }
 
     /**
-     * Overridden method of {@link Closeable} interface. Stops the polling thread.
+     * Overridden method of {@link Closeable} interface. Stops the polling task.
      *
-     * @throws IOException
+     * @throws IOException (although actually no exceptions are thrown)
      */
     @Override
     public void close() throws IOException {
@@ -192,7 +84,8 @@ class DataInputStreamWithTimeout implements Closeable {
      *
      * @param timeoutMSecs the timeout period in milliseconds.
      * @return the next SLIP message if there is one on the queue, or any empty byte[] array if not.
-     * @throws IOException
+     * @throws IOException if the poller task has unexpectedly terminated e.g. via an IOException, or if either the
+     *             poller task, or the calling thread have been interrupted
      */
     public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException {
         startPolling();
@@ -203,16 +96,22 @@ class DataInputStreamWithTimeout implements Closeable {
                 logger.trace("readSlipMessage() => return slip message");
                 return slip;
             } catch (NoSuchElementException e) {
-                // queue empty, wait and continue
+                // queue empty, fall through and continue
             }
-            throwIfPollException();
             try {
-                Thread.sleep(SLEEP_INTERVAL_MSECS);
-            } catch (InterruptedException e) {
-                logger.debug("readSlipMessage() => thread interrupt");
+                Future<Boolean> future = this.future;
+                if ((future != null) && future.isDone()) {
+                    future.get(); // throws ExecutionException, InterruptedException
+                    // future terminated without exception, but prematurely, which is itself an exception
+                    throw new IOException("Poller thread terminated prematurely");
+                }
+                Thread.sleep(SLEEP_INTERVAL_MSECS); // throws InterruptedException
+            } catch (ExecutionException | InterruptedException e) {
+                // re-cast other exceptions as IOException
+                throw new IOException(e);
             }
         }
-        logger.debug("readSlipMessage() => no slip message after {}mS => time out", timeoutMSecs);
+        logger.debug("readSlipMessage() => no slip message");
         return new byte[0];
     }
 
@@ -239,9 +138,12 @@ class DataInputStreamWithTimeout implements Closeable {
      * Start the polling task
      */
     private void startPolling() {
-        if (pollRunner == null) {
-            logger.trace("startPolling()");
-            pollRunner = new Poller(executor);
+        if (future == null) {
+            logger.debug("startPolling() called");
+            slipMessageQueue.clear();
+            poller = new Poller(inputStream, slipMessageQueue);
+            executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
+            future = executor.submit(poller);
         }
     }
 
@@ -249,11 +151,31 @@ class DataInputStreamWithTimeout implements Closeable {
      * Stop the polling task
      */
     private void stopPolling() {
-        Poller pollRunner = this.pollRunner;
-        if (pollRunner != null) {
-            logger.trace("stopPolling()");
-            pollRunner.interrupt();
+        logger.debug("stopPolling() called");
+
+        Poller poller = this.poller;
+        Future<Boolean> future = this.future;
+        ExecutorService executor = this.executor;
+
+        this.poller = null;
+        this.future = null;
+        this.executor = null;
+
+        if (executor != null) {
+            executor.shutdown();
+        }
+        if (poller != null) {
+            poller.interrupt();
+        }
+        if (future != null) {
+            try {
+                future.get(MAX_WAIT_SECONDS, TimeUnit.SECONDS);
+            } catch (ExecutionException e) {
+                // expected exception due to e.g. IOException on socket close
+            } catch (TimeoutException | InterruptedException e) {
+                // unexpected exception due to e.g. KLF200 'zombie state'
+                logger.warn("stopPolling() exception '{}' => PLEASE REPORT !!", e.getMessage());
+            }
         }
-        executor.shutdown();
     }
 }
diff --git a/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/Poller.java b/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/Poller.java
new file mode 100644 (file)
index 0000000..6c347e1
--- /dev/null
@@ -0,0 +1,121 @@
+/**
+ * Copyright (c) 2010-2022 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.velux.internal.bridge.slip.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a Callable to read SLIP messages from the input stream.
+ *
+ * It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer.
+ * And it parses the bytes into SLIP messages, which are placed on a message queue.
+ *
+ * @author Andrew Fiddian-Green - Initial Contribution; refactored from private class in DataInputStreamWithTimeout
+ */
+@NonNullByDefault
+public class Poller implements Callable<Boolean> {
+
+    private static final int BUFFER_SIZE = 512;
+    private static final int QUEUE_SIZE = 512;
+
+    // special character that marks the first and last byte of a slip message
+    private static final byte SLIP_MARK = (byte) 0xc0;
+    private static final byte SLIP_PROT = 0;
+
+    private final Logger logger = LoggerFactory.getLogger(Poller.class);
+
+    private final InputStream inputStream;
+    private final Queue<byte[]> messageQueue;
+
+    private @Nullable volatile Thread thread;
+
+    public Poller(InputStream stream, Queue<byte[]> queue) {
+        logger.trace("Poller: created");
+        inputStream = stream;
+        messageQueue = queue;
+    }
+
+    public void interrupt() {
+        Thread thread = this.thread;
+        if ((thread != null) && thread.isAlive()) {
+            thread.interrupt();
+        }
+    }
+
+    /**
+     * Task that loops to read bytes from inputStream and build SLIP packets from them. The SLIP packets are placed in
+     * messageQueue. It runs until 'interrupt()' or 'Thread.interrupt()' are called.
+     *
+     * @throws IOException in case of socket read errors
+     */
+    @Override
+    public Boolean call() throws IOException {
+        thread = Thread.currentThread();
+        logger.trace("Poller.call(): started");
+        byte[] buf = new byte[BUFFER_SIZE];
+        int byt;
+        int i = 0;
+
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                byt = inputStream.read(); // throws IOException
+                // end of stream is OK => continue polling
+                if (byt < 0) {
+                    continue;
+                }
+            } catch (SocketTimeoutException e) {
+                // socket read time out is OK => continue polling
+                continue;
+            }
+            buf[i] = (byte) byt;
+            if ((i > 0) && (buf[i] == SLIP_MARK)) {
+                // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
+                if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
+                    messageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
+                    if (messageQueue.size() > QUEUE_SIZE) {
+                        logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
+                        messageQueue.poll();
+                    }
+                    i = 0;
+                } else {
+                    if (logger.isWarnEnabled()) {
+                        StringBuilder sb = new StringBuilder();
+                        for (int j = 0; j <= i; j++) {
+                            sb.append(String.format("%02X ", buf[j]));
+                        }
+                        logger.warn("Poller.call(): non slip messsage {} discarded => PLEASE REPORT !!", sb.toString());
+                    }
+                    buf[0] = SLIP_MARK;
+                    i = 1;
+                }
+                continue;
+            }
+            if (++i >= BUFFER_SIZE) {
+                logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
+                i = 0;
+            }
+        }
+        logger.trace("Poller.call(): completed");
+        return true;
+    }
+}
index 87909569b8bd8790bf7afb0716d9b183cdb7ad1c..c8f39b38217b4b14c756da48dddfdf9e9aabf8fc 100644 (file)
@@ -135,6 +135,7 @@ public class VeluxBridgeHandler extends ExtendedBaseBridgeHandler implements Vel
 
     private VeluxBridge myJsonBridge = new JsonVeluxBridge(this);
     private VeluxBridge mySlipBridge = new SlipVeluxBridge(this);
+    private boolean disposing = false;
 
     /*
      * **************************************
@@ -279,6 +280,7 @@ public class VeluxBridgeHandler extends ExtendedBaseBridgeHandler implements Vel
         veluxBridgeConfiguration = new VeluxBinding(getConfigAs(VeluxBridgeConfiguration.class)).checked();
 
         scheduler.execute(() -> {
+            disposing = false;
             initializeSchedulerJob();
         });
     }
@@ -314,6 +316,7 @@ public class VeluxBridgeHandler extends ExtendedBaseBridgeHandler implements Vel
     @Override
     public void dispose() {
         scheduler.submit(() -> {
+            disposing = true;
             disposeSchedulerJob();
         });
     }
@@ -882,4 +885,13 @@ public class VeluxBridgeHandler extends ExtendedBaseBridgeHandler implements Vel
         }
         return threadFactory;
     }
+
+    /**
+     * Indicates if the bridge thing is being disposed.
+     *
+     * @return true if the bridge thing is being disposed.
+     */
+    public boolean isDisposing() {
+        return disposing;
+    }
 }